[AIRFLOW-1559] Dispose SQLAlchemy engines on exit When a forked process or the entire interpreter terminates, we have to close all pooled database connections. The database can run out of connections otherwise. At a minimum, it will print errors in its log file.
By using an atexit handler we ensure that connections are closed for each interpreter and Gunicorn worker termination. Only usages of multiprocessing.Process require special handling as those terminate via os._exit() which does not run finalizers. This commit is based on a contribution by @dhuang https://github.com/apache/incubator-airflow/pull/2767 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6bf1a6ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6bf1a6ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6bf1a6ed Branch: refs/heads/master Commit: 6bf1a6edaf13d3e255c47488f2747a2b8ebeff6c Parents: 5a303eb Author: Stephan Erb <[email protected]> Authored: Sat Nov 25 22:21:28 2017 +0100 Committer: Stephan Erb <[email protected]> Committed: Wed Nov 29 09:49:57 2017 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 3 +++ airflow/settings.py | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6bf1a6ed/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 868e785..91ab96c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -384,6 +384,9 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): finally: sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ + # We re-initialized the ORM within this Process above so we need to + # tear it down manually here + settings.dispose_orm() p = multiprocessing.Process(target=helper, args=(), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6bf1a6ed/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 5559646..04d3548 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -17,6 +17,7 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals +import atexit import logging import os import pendulum @@ -136,6 +137,7 @@ def configure_vars(): def configure_orm(disable_connection_pool=False): + log.debug("Setting up DB connection pool (PID %s)" % os.getpid()) global engine global Session engine_args = {} @@ -154,6 +156,20 @@ def configure_orm(disable_connection_pool=False): sessionmaker(autocommit=False, autoflush=False, bind=engine)) +def dispose_orm(): + """ Properly close pooled database connections """ + log.debug("Disposing DB connection pool (PID %s)", os.getpid()) + global engine + global Session + + if Session: + Session.remove() + Session = None + if engine: + engine.dispose() + engine = None + + def configure_adapters(): from pendulum import Pendulum try: @@ -180,6 +196,9 @@ configure_vars() configure_adapters() configure_orm() +# Ensure we close DB connections at scheduler and gunicon worker terminations +atexit.register(dispose_orm) + # Const stuff KILOBYTE = 1024
