[ https://issues.apache.org/jira/browse/AIRFLOW-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Daniel Standish updated AIRFLOW-4134: ------------------------------------- Description: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Call stack: {code} Call stack: File "/usr/local/bin/airflow", line 32, in <module> args.func(args) File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in scheduler job.run() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in run self._execute() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in _execute self._execute_helper() File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in _execute_helper self.processor_agent.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 511, in start self._async_mode) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 565, in _launch_process p.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start self._popen = self._Popen(self) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch code = process_obj._bootstrap() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 560, in helper processor_manager.start() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 797, in start self.start_in_async() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 820, in start_in_async simple_dags = self.heartbeat() File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1190, in heartbeat zombies = self._find_zombies() File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1236, in _find_zombies LJ.latest_heartbeat < limit_dttm, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in all return list(self) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in _execute_and_instances querycontext, self._connection_from_session, close_with_result=True File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in _get_bind_args mapper=self._bind_mapper(), clause=querycontext.statement, **kw File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in _connection_from_session conn = self.session.connection(**kw) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, in connection execution_options=execution_options, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, in _connection_for_bind engine, execution_options File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 424, in _connection_for_bind conn = bind.contextual_connect() File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2194, in contextual_connect **kwargs File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 125, in __init__ self.dispatch.engine_connect(self, self.__branch) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 297, in __call__ fn(*args, **kw) File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 79, in ping_connection log.warning("DB connection invalidated. Reconnecting...", err) Message: 'DB connection invalidated. Reconnecting...' Arguments: (OperationalError('(psycopg2.OperationalError) server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n',),) {code} Traceback: {code} Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection connection.scalar(select([1])) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar return self.execute(object_, *multiparams, **params).scalar() File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute return meth(self, multiparams, params) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement distilled_params, File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context e, statement, parameters, cursor, context File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception util.raise_from_cause(sqlalchemy_exception, exc_info) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, cause=cause) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, in reraise raise value.with_traceback(tb) File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context cursor, statement, parameters, context File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) {code} It has something to do with the configure_orm function in airflow/settings.py, because that is the only usage of setup_event_handlers (from airflow/utils/sqlalchemy.py). And if I disable connection pooling, then the warning seems to go away. Beyond that, I am not sure where to go from here. But something must be wrong. was: I am finding with 1.10.2 that I seem to get a warning {{DB connection invalidated. Reconnecting...}} very frequently. I to try to diagnose I added logging of the triggering error on line 79 in airflow/utils/sqlalchemy.py, from which this warning is generated. Call stack: {code} webserver_1 | Call stack: webserver_1 | File "/usr/local/bin/airflow", line 32, in <module> webserver_1 | args.func(args) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper webserver_1 | return f(*args, **kwargs) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, in scheduler webserver_1 | job.run() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in run webserver_1 | self._execute() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, in _execute webserver_1 | self._execute_helper() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, in _execute_helper webserver_1 | self.processor_agent.start() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 511, in start webserver_1 | self._async_mode) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 565, in _launch_process webserver_1 | p.start() webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start webserver_1 | self._popen = self._Popen(self) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen webserver_1 | return _default_context.get_context().Process._Popen(process_obj) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen webserver_1 | return Popen(process_obj) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ webserver_1 | self._launch(process_obj) webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch webserver_1 | code = process_obj._bootstrap() webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap webserver_1 | self.run() webserver_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run webserver_1 | self._target(*self._args, **self._kwargs) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 560, in helper webserver_1 | processor_manager.start() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 797, in start webserver_1 | self.start_in_async() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 820, in start_in_async webserver_1 | simple_dags = self.heartbeat() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1190, in heartbeat webserver_1 | zombies = self._find_zombies() webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper webserver_1 | return func(*args, **kwargs) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", line 1236, in _find_zombies webserver_1 | LJ.latest_heartbeat < limit_dttm, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 2925, in all webserver_1 | return list(self) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3081, in __iter__ webserver_1 | return self._execute_and_instances(context) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3103, in _execute_and_instances webserver_1 | querycontext, self._connection_from_session, close_with_result=True webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3111, in _get_bind_args webserver_1 | mapper=self._bind_mapper(), clause=querycontext.statement, **kw webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3096, in _connection_from_session webserver_1 | conn = self.session.connection(**kw) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1120, in connection webserver_1 | execution_options=execution_options, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1126, in _connection_for_bind webserver_1 | engine, execution_options webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 424, in _connection_for_bind webserver_1 | conn = bind.contextual_connect() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2194, in contextual_connect webserver_1 | **kwargs webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 125, in __init__ webserver_1 | self.dispatch.engine_connect(self, self.__branch) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 297, in __call__ webserver_1 | fn(*args, **kw) webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 79, in ping_connection webserver_1 | log.warning("DB connection invalidated. Reconnecting...", err) webserver_1 | Message: 'DB connection invalidated. Reconnecting...' webserver_1 | Arguments: (OperationalError('(psycopg2.OperationalError) server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n',),) {code} Traceback: {code} webserver_1 | Traceback (most recent call last): webserver_1 | File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", line 68, in ping_connection webserver_1 | connection.scalar(select([1])) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 912, in scalar webserver_1 | return self.execute(object_, *multiparams, **params).scalar() webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 980, in execute webserver_1 | return meth(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection webserver_1 | return connection._execute_clauseelement(self, multiparams, params) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement webserver_1 | distilled_params, webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context webserver_1 | e, statement, parameters, cursor, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception webserver_1 | util.raise_from_cause(sqlalchemy_exception, exc_info) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause webserver_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 276, in reraise webserver_1 | raise value.with_traceback(tb) webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context webserver_1 | cursor, statement, parameters, context webserver_1 | File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 536, in do_execute webserver_1 | cursor.execute(statement, parameters) webserver_1 | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly webserver_1 | This probably means the server terminated abnormally webserver_1 | before or while processing the request. webserver_1 | [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) {code} It has something to do with the configure_orm function in airflow/settings.py, because that is the only usage of setup_event_handlers (from airflow/utils/sqlalchemy.py). And if I disable connection pooling, then the warning seems to go away. Beyond that, I am not sure where to go from here. But something must be wrong. > "DB connection invalidated" warning at every zombie check > --------------------------------------------------------- > > Key: AIRFLOW-4134 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4134 > Project: Apache Airflow > Issue Type: Bug > Affects Versions: 1.10.2 > Reporter: Daniel Standish > Priority: Major > > I am finding with 1.10.2 that I seem to get a warning {{DB connection > invalidated. Reconnecting...}} very frequently. > I to try to diagnose I added logging of the triggering error on line 79 in > airflow/utils/sqlalchemy.py, from which this warning is generated. > Call stack: > {code} > Call stack: > File "/usr/local/bin/airflow", line 32, in <module> > args.func(args) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line > 74, in wrapper > return f(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 992, > in scheduler > job.run() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 205, in > run > self._execute() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1532, > in _execute > self._execute_helper() > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1562, > in _execute_helper > self.processor_agent.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 511, in start > self._async_mode) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 565, in _launch_process > p.start() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in > start > self._popen = self._Popen(self) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in > _Popen > return _default_context.get_context().Process._Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in > _Popen > return Popen(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in > __init__ > self._launch(process_obj) > File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in > _launch > code = process_obj._bootstrap() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in > _bootstrap > self.run() > File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run > self._target(*self._args, **self._kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 560, in helper > processor_manager.start() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 797, in start > self.start_in_async() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 820, in start_in_async > simple_dags = self.heartbeat() > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1190, in heartbeat > zombies = self._find_zombies() > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, > in wrapper > return func(*args, **kwargs) > File > "/usr/local/lib/python3.6/site-packages/airflow/utils/dag_processing.py", > line 1236, in _find_zombies > LJ.latest_heartbeat < limit_dttm, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 2925, in all > return list(self) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3081, in __iter__ > return self._execute_and_instances(context) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3103, in _execute_and_instances > querycontext, self._connection_from_session, close_with_result=True > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3111, in _get_bind_args > mapper=self._bind_mapper(), clause=querycontext.statement, **kw > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line > 3096, in _connection_from_session > conn = self.session.connection(**kw) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1120, in connection > execution_options=execution_options, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 1126, in _connection_for_bind > engine, execution_options > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 424, in _connection_for_bind > conn = bind.contextual_connect() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 2194, in contextual_connect > **kwargs > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 125, in __init__ > self.dispatch.engine_connect(self, self.__branch) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", > line 297, in __call__ > fn(*args, **kw) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", > line 79, in ping_connection > log.warning("DB connection invalidated. Reconnecting...", err) > Message: 'DB connection invalidated. Reconnecting...' > Arguments: (OperationalError('(psycopg2.OperationalError) server closed the > connection unexpectedly\n\tThis probably means the server terminated > abnormally\n\tbefore or while processing the request.\n',),) > {code} > Traceback: > {code} > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/utils/sqlalchemy.py", > line 68, in ping_connection > connection.scalar(select([1])) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 912, in scalar > return self.execute(object_, *multiparams, **params).scalar() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 980, in execute > return meth(self, multiparams, params) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", > line 273, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1099, in _execute_clauseelement > distilled_params, > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1240, in _execute_context > e, statement, parameters, cursor, context > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1458, in _handle_dbapi_exception > util.raise_from_cause(sqlalchemy_exception, exc_info) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 296, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 276, in reraise > raise value.with_traceback(tb) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1236, in _execute_context > cursor, statement, parameters, context > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", > line 536, in do_execute > cursor.execute(statement, parameters) > sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed > the connection unexpectedly > This probably means the server terminated abnormally > before or while processing the request. > [SQL: 'SELECT 1'] (Background on this error at: http://sqlalche.me/e/e3q8) > {code} > It has something to do with the configure_orm function in > airflow/settings.py, because that is the only usage of setup_event_handlers > (from airflow/utils/sqlalchemy.py). > And if I disable connection pooling, then the warning seems to go away. > Beyond that, I am not sure where to go from here. But something must be > wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)