[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ash Berlin-Taylor resolved AIRFLOW-4134. ---------------------------------------- Resolution: Duplicate > "DB connection invalidated" warning raised every JOB_HEARTBEAT_SEC > ------------------------------------------------------------------ > > Key: AIRFLOW-4134 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4134 > Project: Apache Airflow > Issue Type: Bug > Affects Versions: 1.10.2 > Reporter: Daniel Standish > Priority: Major > > I am finding with 1.10.2 that I seem to get a warning {{DB connection > invalidated. Reconnecting...}} very frequently. It seems to coincide closely > with my job_heartbeat_sec parameter (5 seconds). > I have tried to diagnose I added logging of the triggering error on line 79 > in airflow/utils/sqlalchemy.py, from which this warning is generated. > Looks like it is related to zombie check. > Call stack: > {code} > Call stack: > File "/usr/local/bin/airflow", line 32, in <module> > args.func(args) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line > 74, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, > in scheduler > job.run() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in > run > self._execute() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, > in _execute > self._execute_helper() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, > in _execute_helper > self.processor_agent.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 511, in start > self._async_mode) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 565, in _launch_process > p.start() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in > start > self._popen = self._Popen(self) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in > _Popen > return _default_context.get_context().Process._Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in > _Popen > return Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in > __init__ > self._launch(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in > _launch > code = process_obj._bootstrap() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in > _bootstrap > self.run() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run > self._target(*self._args, **self._kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 560, in helper > processor_manager.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 797, in start > self.start_in_async() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 820, in start_in_async > simple_dags = self.heartbeat() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1190, in heartbeat > zombies = self._find_zombies() > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, > in wrapper > return func(*args, **kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1236, in _find_zombies > LJ.latest_heartbeat < limit_dttm, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 2925, in all > return list(self) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3081, in __iter__ > return self._execute_and_instances(context) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3103, in _execute_and_instances > querycontext, self._connection_from_session, close_with_result=True > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3111, in _get_bind_args > mapper=self._bind_mapper(), clause=querycontext.statement, **kw > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3096, in _connection_from_session > conn = self.session.connection(**kw) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1120, in connection > execution_options=execution_options, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1126, in _connection_for_bind > engine, execution_options > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 424, in _connection_for_bind > conn = bind.contextual_connect() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 2194, in contextual_connect > **kwargs > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 125, in __init__ > self.dispatch.engine_connect(self, self.__branch) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", > line 297, in __call__ > fn(*args, **kw) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", > line 79, in ping_connection > log.warning("DB connection invalidated. Reconnecting...", err) > Message: 'DB connection invalidated. Reconnecting...' > Arguments: (OperationalError('(psycopg2.OperationalError) server closed the > connection unexpectedly\n\tThis probably means the server terminated > abnormally\n\tbefore or while processing the request.\n',),) > {code} > Traceback: > {code} > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", > line 68, in ping_connection > connection.scalar(select([1])) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 912, in scalar > return self.execute(object_, *multiparams, **params).scalar() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 980, in execute > return meth(self, multiparams, params) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", > line 273, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1099, in _execute_clauseelement > distilled_params, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1240, in _execute_context > e, statement, parameters, cursor, context > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1458, in _handle_dbapi_exception > util.raise_from_cause(sqlalchemy_exception, exc_info) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 296, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 276, in reraise > raise value.with_traceback(tb) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1236, in _execute_context > cursor, statement, parameters, context > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", > line 536, in do_execute > cursor.execute(statement, parameters) > sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed > the connection unexpectedly > This probably means the server terminated abnormally > before or while processing the request. > [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) > {code} > If I disable connection pooling, then the warning seems to go away. > Beyond that, I am not sure where to go from here. But something must be > wrong. > Currently running with a variation of puckel/docker, but seems to happen on > every setup with 1.10.2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)