wjddn279 opened a new pull request, #56044: URL: https://github.com/apache/airflow/pull/56044
Hello, While using the latest version of Airflow (3.0.6), I discovered a new bug. I would like to provide the root cause, possible solutions, and a fix in this PR. Since this is quite a long explanation, I had to omit some details — please feel free to leave comments if you’d like clarification. I also plan to add test cases in line with your feedback. ## Problem I deployed Airflow in a Kubernetes environment and observed that the dag-processor was restarting irregularly. Upon checking the error logs, I found the following issues: (I attach full log files: [full_log.txt](https://github.com/user-attachments/files/22513572/full_log.txt)) <details close> <summary>error log1</summary> [2025-09-24T17:01:10.824+0900] {retries.py:95} DEBUG - Running DagWarning.purge_inactive_dag_warnings with retries. Try 1 of 3 [2025-09-24T17:01:10.827+0900] {dag_processor_job_runner.py:63} ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query _mysql.connection.query(self, query) MySQLdb.OperationalError: (2013, 'Lost connection to server during query') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 269, in run return self._run_parsing_loop() ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 340, in _run_parsing_loop self._queue_requested_files_for_parsing() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 408, in _queue_requested_files_for_parsing files = self._get_priority_files() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 101, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 426, in _get_priority_files requests = session.scalars( ^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1778, in scalars return self.execute( ^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute result = conn._execute_20(statement, params or {}, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query _mysql.connection.query(self, query) sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to server during query') [SQL: SELECT dag_priority_parsing_request.id, dag_priority_parsing_request.bundle_name, dag_priority_parsing_request.relative_fileloc FROM dag_priority_parsing_request WHERE dag_priority_parsing_request.bundle_name IN (%s)] [parameters: ('dags-folder',)] (Background on this error at: https://sqlalche.me/e/14/e3q8) 2025-09-24 17:01:10 [info ] Process exited [supervisor] exit_code=<Negsignal.SIGTERM: -15> pid=268 signal_sent=SIGTERM 2025-09-24 17:01:10 [info ] Process exited [supervisor] exit_code=<Negsignal.SIGTERM: -15> pid=270 signal_sent=SIGTERM [2025-09-24T17:01:10.860+0900] {process_utils.py:285} INFO - Waiting up to 5 seconds for processes to exit... [2025-09-24T17:01:10.860+0900] {listener.py:37} DEBUG - Calling 'before_stopping' with {'component': <airflow.jobs.job.Job object at 0x7f4bbf19ad90>} [2025-09-24T17:01:10.860+0900] {listener.py:38} DEBUG - Hook impls: [] [2025-09-24T17:01:10.861+0900] {listener.py:42} DEBUG - Result from 'before_stopping': [] [2025-09-24T17:01:10.876+0900] {cli_action_loggers.py:97} DEBUG - Calling callbacks: [] Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query _mysql.connection.query(self, query) MySQLdb.OperationalError: (2013, 'Lost connection to server during query') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 7, in <module> sys.exit(main()) ^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 55, in main args.func(args) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 48, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 112, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py", line 54, in dag_processor run_command_with_daemon_option( File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option callback() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py", line 57, in <lambda> callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 101, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 347, in run_job return execute_job(job, execute_callable=execute_callable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 376, in execute_job ret = execute_callable() ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 269, in run return self._run_parsing_loop() ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 340, in _run_parsing_loop self._queue_requested_files_for_parsing() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 408, in _queue_requested_files_for_parsing files = self._get_priority_files() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 101, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", line 426, in _get_priority_files requests = session.scalars( ^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1778, in scalars return self.execute( ^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute result = conn._execute_20(statement, params or {}, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query _mysql.connection.query(self, query) sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to server during query') [SQL: SELECT dag_priority_parsing_request.id, dag_priority_parsing_request.bundle_name, dag_priority_parsing_request.relative_fileloc FROM dag_priority_parsing_request WHERE dag_priority_parsing_request.bundle_name IN (%s)] [parameters: ('dags-folder',)] (Background on this error at: https://sqlalche.me/e/14/e3q8) [2025-09-24T17:01:10.882+0900] {settings.py:494} DEBUG - Disposing DB connection pool (PID 7) </details> <details close> <summary>error log2</summary> gw_contents_server_tech.daily.gw_daily_webtoon_episode_video to the DB [2025-09-24T16:57:52.086+0900] {before_sleep.py:65} DEBUG - Retrying <unknown> in 0.24866784005501685 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to server during query') [SQL: SELECT ab_permission_view.permission_id AS ab_permission_view_permission_id, ab_permission_view.view_menu_id AS ab_permission_view_view_menu_id, ab_permission_view.id AS ab_permission_view_id, ab_permission_1.id AS ab_permission_1_id, ab_permission_1.name AS ab_permission_1_name, ab_view_menu_1.id AS ab_view_menu_1_id, ab_view_menu_1.name AS ab_view_menu_1_name FROM ab_permission_view LEFT OUTER JOIN ab_permission AS ab_permission_1 ON ab_permission_1.id = ab_permission_view.permission_id LEFT OUTER JOIN ab_view_menu AS ab_view_menu_1 ON ab_view_menu_1.id = ab_permission_view.view_menu_id WHERE %s = ab_permission_view.permission_id AND %s = ab_permission_view.view_menu_id] [parameters: (11, 190)] (Background on this error at: https://sqlalche.me/e/14/e3q8). Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query _mysql.connection.query(self, query) MySQLdb.OperationalError: (2013, 'Lost connection to server during query') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 355, in update_dag_parsing_results_in_db _serialize_dag_capturing_errors( File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 203, in _serialize_dag_capturing_errors _sync_dag_perms(dag, session=session) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", line 223, in _sync_dag_perms security_manager.sync_perm_for_dag(dag_id, dag.access_control) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py", line 975, in sync_perm_for_dag self.create_permission(dag_action_name, dag_resource_name) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py", line 1630, in create_permission perm = self.get_permission(action_name, resource_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py", line 1607, in get_permission .one_or_none() ^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2850, in one_or_none return self._iter().one_or_none() ^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter result = self.session.execute( ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute result = conn._execute_20(statement, params or {}, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", line 280, in query _mysql.connection.query(self, query) sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to server during query') [SQL: SELECT ab_permission_view.permission_id AS ab_permission_view_permission_id, ab_permission_view.view_menu_id AS ab_permission_view_view_menu_id, ab_permission_view.id AS ab_permission_view_id, ab_permission_1.id AS ab_permission_1_id, ab_permission_1.name AS ab_permission_1_name, ab_view_menu_1.id AS ab_view_menu_1_id, ab_view_menu_1.name AS ab_view_menu_1_name FROM ab_permission_view LEFT OUTER JOIN ab_permission AS ab_permission_1 ON ab_permission_1.id = ab_permission_view.permission_id LEFT OUTER JOIN ab_view_menu AS ab_view_menu_1 ON ab_view_menu_1.id = ab_permission_view.view_menu_id WHERE %s = ab_permission_view.permission_id AND %s = ab_permission_view.view_menu_id] [parameters: (11, 190)] (Background on this error at: https://sqlalche.me/e/14/e3q8) [2025-09-24T16:57:52.341+0900] {collection.py:344} DEBUG - Running dagbag.bulk_write_to_db with retries. Try 2 of 3 </details> As seen in error log1, the `_get_priority_files()` function executes a query, during which the MySQL connection is unexpectedly closed. This raises an exception and causes the dag-processor to exit. I also identified other exceptions (error log2). While those do not lead to termination (since they are covered by retry logic and try-catch blocks), they appear to be caused by the same underlying issue: sudden termination of MySQL connections. By reviewing the queries arriving at MySQL during the error times, I confirmed that the connection was indeed being closed with a `Quit` command. <img width="1062" height="138" alt="Image" src="https://github.com/user-attachments/assets/c6df1ef8-0d01-4e55-9aef-111771741f67" /> ## Root Cause ### Why are DB connections suddenly closed? At first, I suspected an issue with MySQL itself. However, since the exact same database worked fine with Airflow 2.10.2 and only started failing after upgrading to 3.0.6, I concluded the root cause lies in Airflow’s application code. To investigate, I modified the code to track when connection objects were finalized by attaching a finalizer when connections were created. <details close> <summary>code</summary> @event.listens_for(engine, "connect") def set_mysql_timezone(dbapi_connection, connection_record) log.debug(f"[connect] New DB connection established, id={os.getpid()}") weakref.finalize(dbapi_connection, lambda: print(f"{datetime.now().isoformat()} dbapi_connection finalized via weakref in os {os.getpid()}", )) weakref.finalize(connection_record, lambda: print(f"{datetime.now().isoformat()} connection_record finalized via weakref in os {os.getpid()}")) </details> After running with this change, I observed that during dag parsing, when the dag-processor forks child processes, the following logs appeared — exactly when MySQL issued the Quit command (PID 417 is the subprocess): ``` 2025-09-22T13:41:30.352393 connection_record finalized via weakref in os 417 2025-09-22T13:41:30.352403 dbapi_connection finalized via weakref in os 417 ``` This shows that in the forked subprocess, connection objects from the parent process were garbage-collected, which triggered the `Quit` command and closed the underlying DB connection. As a result, the parent process attempts to reuse a connection that has already been closed by its child process — leading to failures (e.g., pre-ping failure or query execution on a closed connection). ### Why is connection object being garbage-collected? The cause lies in this code: https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/settings.py#L426-L436 The call to `engine.dispose(close=False)` recreates the engine.pool object. Once the engine’s pool object is replaced, existing connections are detached from pool management and retained only through weak references. They are no longer protected from garbage collection in forked child processes. Once GC runs in the subprocess, those connections are finalized, and the DB connection is closed — which then affects the parent process. However, this issue does not occur in all cases. In most child processes, GC is never triggered before the process ends, so no problem occurs. But if a dag file is large enough to push memory usage over the GC threshold, garbage collection is triggered — and the issue occurs. I confirmed this hypothesis by: - disabling GC in the subprocess (`gc.disable()`) - drastically increasing the GC threshold (`gc.set_threshold`) Both approaches prevented the error. Conversely, forcing `gc.collect()` in the subprocess consistently reproduced the issue. ## Solutions There are three possible approaches: 1. Keep a strong reference to connections in a dictionary to prevent GC. 2. Disable GC in subprocesses. 3. Avoid calling dispose; instead, create a new engine in the subprocess. Among these, (3) is the cleanest and aligns with SQLAlchemy’s official recommendation: create a new engine and session objects when forking processes. By replacing dispose() with logic to create a new engine in subprocesses, I confirmed that existing connections were no longer finalized and the bug disappeared. <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> <!-- Please keep an empty line above the dashes. --> --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
