This document describes the current stable version of Celery (3.1). For development docs, go here.
# -*- coding: utf-8 -*-
"""
celery.backends.database.session
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
SQLAlchemy sessions.
"""
from __future__ import absolute_import
from collections import defaultdict
from multiprocessing.util import register_after_fork
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
ResultModelBase = declarative_base()
_SETUP = defaultdict(lambda: False)
_ENGINES = {}
_SESSIONS = {}
__all__ = ['ResultSession', 'get_engine', 'create_session']
class _after_fork(object):
registered = False
def __call__(self):
self.registered = False # child must reregister
for engine in list(_ENGINES.values()):
engine.dispose()
_ENGINES.clear()
_SESSIONS.clear()
after_fork = _after_fork()
[docs]def get_engine(dburi, **kwargs):
try:
return _ENGINES[dburi]
except KeyError:
engine = _ENGINES[dburi] = create_engine(dburi, **kwargs)
after_fork.registered = True
register_after_fork(after_fork, after_fork)
return engine
[docs]def create_session(dburi, short_lived_sessions=False, **kwargs):
engine = get_engine(dburi, **kwargs)
if short_lived_sessions or dburi not in _SESSIONS:
_SESSIONS[dburi] = sessionmaker(bind=engine)
return engine, _SESSIONS[dburi]
def setup_results(engine):
if not _SETUP['results']:
ResultModelBase.metadata.create_all(engine)
_SETUP['results'] = True
[docs]def ResultSession(dburi, **kwargs):
engine, session = create_session(dburi, **kwargs)
setup_results(engine)
return session()