wjddn279 opened a new pull request, #56044:
URL: https://github.com/apache/airflow/pull/56044

   Hello,
   
   While using the latest version of Airflow (3.0.6), I discovered a new bug. I 
would like to provide the root cause, possible solutions, and a fix in this PR.
   Since this is quite a long explanation, I had to omit some details — please 
feel free to leave comments if you’d like clarification. I also plan to add 
test cases in line with your feedback.
   
   ## Problem
   I deployed Airflow in a Kubernetes environment and observed that the 
dag-processor was restarting irregularly. Upon checking the error logs, I found 
the following issues: (I attach full log files: 
[full_log.txt](https://github.com/user-attachments/files/22513572/full_log.txt))
   
   <details close>
     <summary>error log1</summary>
   [2025-09-24T17:01:10.824+0900] {retries.py:95} DEBUG - Running 
DagWarning.purge_inactive_dag_warnings with retries. Try 1 of 3
   [2025-09-24T17:01:10.827+0900] {dag_processor_job_runner.py:63} ERROR - 
Exception when executing DagProcessorJob
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 280, in query
       _mysql.connection.query(self, query)
   MySQLdb.OperationalError: (2013, 'Lost connection to server during query')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 269, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 340, in _run_parsing_loop
       self._queue_requested_files_for_parsing()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 408, in _queue_requested_files_for_parsing
       files = self._get_priority_files()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 426, in _get_priority_files
       requests = session.scalars(
                  ^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1778, in scalars
       return self.execute(
              ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1717, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 280, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost 
connection to server during query')
   [SQL: SELECT dag_priority_parsing_request.id, 
dag_priority_parsing_request.bundle_name, 
dag_priority_parsing_request.relative_fileloc 
   FROM dag_priority_parsing_request 
   WHERE dag_priority_parsing_request.bundle_name IN (%s)]
   [parameters: ('dags-folder',)]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   2025-09-24 17:01:10 [info     ] Process exited                 [supervisor] 
exit_code=<Negsignal.SIGTERM: -15> pid=268 signal_sent=SIGTERM
   2025-09-24 17:01:10 [info     ] Process exited                 [supervisor] 
exit_code=<Negsignal.SIGTERM: -15> pid=270 signal_sent=SIGTERM
   [2025-09-24T17:01:10.860+0900] {process_utils.py:285} INFO - Waiting up to 5 
seconds for processes to exit...
   [2025-09-24T17:01:10.860+0900] {listener.py:37} DEBUG - Calling 
'before_stopping' with {'component': <airflow.jobs.job.Job object at 
0x7f4bbf19ad90>}
   [2025-09-24T17:01:10.860+0900] {listener.py:38} DEBUG - Hook impls: []
   [2025-09-24T17:01:10.861+0900] {listener.py:42} DEBUG - Result from 
'before_stopping': []
   [2025-09-24T17:01:10.876+0900] {cli_action_loggers.py:97} DEBUG - Calling 
callbacks: []
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 280, in query
       _mysql.connection.query(self, query)
   MySQLdb.OperationalError: (2013, 'Lost connection to server during query')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 7, in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", 
line 48, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 
112, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py",
 line 54, in dag_processor
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/dag_processor_command.py",
 line 57, in <lambda>
       callback=lambda: run_job(job=job_runner.job, 
execute_callable=job_runner._execute),
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 
347, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 
376, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 269, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 340, in _run_parsing_loop
       self._queue_requested_files_for_parsing()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 408, in _queue_requested_files_for_parsing
       files = self._get_priority_files()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
 line 426, in _get_priority_files
       requests = session.scalars(
                  ^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1778, in scalars
       return self.execute(
              ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1717, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 280, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost 
connection to server during query')
   [SQL: SELECT dag_priority_parsing_request.id, 
dag_priority_parsing_request.bundle_name, 
dag_priority_parsing_request.relative_fileloc 
   FROM dag_priority_parsing_request 
   WHERE dag_priority_parsing_request.bundle_name IN (%s)]
   [parameters: ('dags-folder',)]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   [2025-09-24T17:01:10.882+0900] {settings.py:494} DEBUG - Disposing DB 
connection pool (PID 7)
   </details>
   
   <details close>
     <summary>error log2</summary>
   gw_contents_server_tech.daily.gw_daily_webtoon_episode_video to the DB
   [2025-09-24T16:57:52.086+0900] {before_sleep.py:65} DEBUG - Retrying 
<unknown> in 0.24866784005501685 seconds as it raised OperationalError: 
(MySQLdb.OperationalError) (2013, 'Lost connection to server during query')
   [SQL: SELECT ab_permission_view.permission_id AS 
ab_permission_view_permission_id, ab_permission_view.view_menu_id AS 
ab_permission_view_view_menu_id, ab_permission_view.id AS 
ab_permission_view_id, ab_permission_1.id AS ab_permission_1_id, 
ab_permission_1.name AS ab_permission_1_name, ab_view_menu_1.id AS 
ab_view_menu_1_id, ab_view_menu_1.name AS ab_view_menu_1_name 
   FROM ab_permission_view LEFT OUTER JOIN ab_permission AS ab_permission_1 ON 
ab_permission_1.id = ab_permission_view.permission_id LEFT OUTER JOIN 
ab_view_menu AS ab_view_menu_1 ON ab_view_menu_1.id = 
ab_permission_view.view_menu_id 
   WHERE %s = ab_permission_view.permission_id AND %s = 
ab_permission_view.view_menu_id]
   [parameters: (11, 190)]
   (Background on this error at: https://sqlalche.me/e/14/e3q8).
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 280, in query
       _mysql.connection.query(self, query)
   MySQLdb.OperationalError: (2013, 'Lost connection to server during query')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 355, in update_dag_parsing_results_in_db
       _serialize_dag_capturing_errors(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 203, in _serialize_dag_capturing_errors
       _sync_dag_perms(dag, session=session)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
 line 223, in _sync_dag_perms
       security_manager.sync_perm_for_dag(dag_id, dag.access_control)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py",
 line 975, in sync_perm_for_dag
       self.create_permission(dag_action_name, dag_resource_name)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py",
 line 1630, in create_permission
       perm = self.get_permission(action_name, resource_name)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py",
 line 1607, in get_permission
       .one_or_none()
        ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", 
line 2850, in one_or_none
       return self._iter().one_or_none()
              ^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", 
line 2916, in _iter
       result = self.session.execute(
                ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1717, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 280, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost 
connection to server during query')
   [SQL: SELECT ab_permission_view.permission_id AS 
ab_permission_view_permission_id, ab_permission_view.view_menu_id AS 
ab_permission_view_view_menu_id, ab_permission_view.id AS 
ab_permission_view_id, ab_permission_1.id AS ab_permission_1_id, 
ab_permission_1.name AS ab_permission_1_name, ab_view_menu_1.id AS 
ab_view_menu_1_id, ab_view_menu_1.name AS ab_view_menu_1_name 
   FROM ab_permission_view LEFT OUTER JOIN ab_permission AS ab_permission_1 ON 
ab_permission_1.id = ab_permission_view.permission_id LEFT OUTER JOIN 
ab_view_menu AS ab_view_menu_1 ON ab_view_menu_1.id = 
ab_permission_view.view_menu_id 
   WHERE %s = ab_permission_view.permission_id AND %s = 
ab_permission_view.view_menu_id]
   [parameters: (11, 190)]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   [2025-09-24T16:57:52.341+0900] {collection.py:344} DEBUG - Running 
dagbag.bulk_write_to_db with retries. Try 2 of 3
   </details>
   
   As seen in error log1, the `_get_priority_files()` function executes a 
query, during which the MySQL connection is unexpectedly closed. This raises an 
exception and causes the dag-processor to exit.
   I also identified other exceptions (error log2). While those do not lead to 
termination (since they are covered by retry logic and try-catch blocks), they 
appear to be caused by the same underlying issue: sudden termination of MySQL 
connections.
   
   By reviewing the queries arriving at MySQL during the error times, I 
confirmed that the connection was indeed being closed with a `Quit` command.
   <img width="1062" height="138" alt="Image" 
src="https://github.com/user-attachments/assets/c6df1ef8-0d01-4e55-9aef-111771741f67";
 />
   
   
   ## Root Cause
   
   ### Why are DB connections suddenly closed?
   
   At first, I suspected an issue with MySQL itself. However, since the exact 
same database worked fine with Airflow 2.10.2 and only started failing after 
upgrading to 3.0.6, I concluded the root cause lies in Airflow’s application 
code.
   
   To investigate, I modified the code to track when connection objects were 
finalized by attaching a finalizer when connections were created.
   <details close>
     <summary>code</summary>
   
         @event.listens_for(engine, "connect")
         def set_mysql_timezone(dbapi_connection, connection_record)
             log.debug(f"[connect] New DB connection established, 
id={os.getpid()}")
   
             weakref.finalize(dbapi_connection,
                              lambda: print(f"{datetime.now().isoformat()} 
dbapi_connection finalized via weakref in os {os.getpid()}",
                                            ))
             weakref.finalize(connection_record, lambda: 
print(f"{datetime.now().isoformat()} connection_record finalized via weakref in 
os {os.getpid()}"))
   </details>
   
   
   After running with this change, I observed that during dag parsing, when the 
dag-processor forks child processes, the following logs appeared — exactly when 
MySQL issued the Quit command (PID 417 is the subprocess):
   ```
   2025-09-22T13:41:30.352393 connection_record finalized via weakref in os 417
   2025-09-22T13:41:30.352403 dbapi_connection finalized via weakref in os 417
   ```
   
   This shows that in the forked subprocess, connection objects from the parent 
process were garbage-collected, which triggered the `Quit` command and closed 
the underlying DB connection.
   As a result, the parent process attempts to reuse a connection that has 
already been closed by its child process — leading to failures (e.g., pre-ping 
failure or query execution on a closed connection).
   
   ### Why is connection object being garbage-collected?
   The cause lies in this code:
   
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/settings.py#L426-L436
   
   The call to `engine.dispose(close=False)` recreates the engine.pool object.
   Once the engine’s pool object is replaced, existing connections are detached 
from pool management and retained only through weak references. They are no 
longer protected from garbage collection in forked child processes. Once GC 
runs in the subprocess, those connections are finalized, and the DB connection 
is closed — which then affects the parent process.
   
   However, this issue does not occur in all cases.
   In most child processes, GC is never triggered before the process ends, so 
no problem occurs. But if a dag file is large enough to push memory usage over 
the GC threshold, garbage collection is triggered — and the issue occurs.
   
   I confirmed this hypothesis by:
   - disabling GC in the subprocess (`gc.disable()`)
   - drastically increasing the GC threshold (`gc.set_threshold`)
   
   Both approaches prevented the error. Conversely, forcing `gc.collect()` in 
the subprocess consistently reproduced the issue.
   
   ## Solutions
   There are three possible approaches:
   1. Keep a strong reference to connections in a dictionary to prevent GC.
   2. Disable GC in subprocesses.
   3. Avoid calling dispose; instead, create a new engine in the subprocess.
   
   Among these, (3) is the cleanest and aligns with SQLAlchemy’s official 
recommendation: create a new engine and session objects when forking processes.
   
   By replacing dispose() with logic to create a new engine in subprocesses, I 
confirmed that existing connections were no longer finalized and the bug 
disappeared.
   
   
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to