This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c2079585e4 Fix DagFileProcessorManager silent hang on DB lock 
contention (#68118)
0c2079585e4 is described below

commit 0c2079585e4eee5154ab94a6876830aabe15e171
Author: Subham <[email protected]>
AuthorDate: Sun Jun 7 13:18:36 2026 +0530

    Fix DagFileProcessorManager silent hang on DB lock contention (#68118)
    
    * Fix DagFileProcessorManager silent hang on DB lock contention
    
    * Change unsupported lock timeout warning to debug log
---
 airflow-core/src/airflow/dag_processing/manager.py | 68 ++++++++++++++++------
 airflow-core/src/airflow/utils/sqlalchemy.py       | 44 +++++++++++++-
 .../tests/unit/dag_processing/test_manager.py      | 56 ++++++++++++++++++
 3 files changed, 148 insertions(+), 20 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index fc00b6730c8..37f0496a89c 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -40,6 +40,7 @@ from typing import TYPE_CHECKING, Any, Literal, NamedTuple, 
cast
 import attrs
 import structlog
 from sqlalchemy import select, update
+from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import load_only
 from tabulate import tabulate
 from uuid6 import uuid7
@@ -76,7 +77,12 @@ from airflow.utils.process_utils import (
 )
 from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
-from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
+from airflow.utils.sqlalchemy import (
+    is_lock_not_available_error,
+    prohibit_commit,
+    with_db_lock_timeout,
+    with_row_locks,
+)
 
 if TYPE_CHECKING:
     from collections.abc import Callable, Iterable, Iterator, Sequence
@@ -453,15 +459,26 @@ class DagFileProcessorManager(LoggingMixin):
                     to_deactivate.add(dag.dag_id)
 
         if to_deactivate:
-            deactivated_dagmodel = session.execute(
-                update(DagModel)
-                .where(DagModel.dag_id.in_(to_deactivate))
-                .values(is_stale=True)
-                .execution_options(synchronize_session="fetch")
-            )
-            deactivated = getattr(deactivated_dagmodel, "rowcount", 0)
-            if deactivated:
-                self.log.info("Deactivated %i DAGs which are no longer present 
in file.", deactivated)
+            try:
+                with with_db_lock_timeout(session=session, lock_timeout=30):
+                    deactivated_dagmodel = session.execute(
+                        update(DagModel)
+                        .where(DagModel.dag_id.in_(to_deactivate))
+                        .values(is_stale=True)
+                        .execution_options(synchronize_session="fetch")
+                    )
+                    deactivated = getattr(deactivated_dagmodel, "rowcount", 0)
+                    if deactivated:
+                        self.log.info("Deactivated %i DAGs which are no longer 
present in file.", deactivated)
+            except OperationalError as e:
+                if is_lock_not_available_error(e):
+                    self.log.warning(
+                        "Lock not available when deactivating stale DAGs. "
+                        "Skipping this iteration to prevent processor hang."
+                    )
+                    session.rollback()
+                else:
+                    raise
 
     def _run_parsing_loop(self):
         # initialize cache to mutualize calls to Variable.get in DAGs
@@ -903,15 +920,28 @@ class DagFileProcessorManager(LoggingMixin):
         """Deactivate DAGs that come from files that are no longer present in 
bundle."""
         observed_filelocs = self._get_observed_filelocs(present)
         with create_session() as session:
-            any_deactivated = DagModel.deactivate_deleted_dags(
-                bundle_name=bundle_name,
-                rel_filelocs=observed_filelocs,
-                session=session,
-            )
-            # Only run cleanup if we actually deactivated any DAGs
-            # This avoids unnecessary DELETE queries in the common case where 
no DAGs were deleted
-            if any_deactivated:
-                remove_references_to_deleted_dags(session=session)
+            try:
+                with with_db_lock_timeout(session=session, lock_timeout=30):
+                    any_deactivated = DagModel.deactivate_deleted_dags(
+                        bundle_name=bundle_name,
+                        rel_filelocs=observed_filelocs,
+                        session=session,
+                    )
+                    # Only run cleanup if we actually deactivated any DAGs
+                    # This avoids unnecessary DELETE queries in the common 
case where no DAGs were deleted
+                    if any_deactivated:
+                        remove_references_to_deleted_dags(session=session)
+                    session.flush()
+            except OperationalError as e:
+                if is_lock_not_available_error(e):
+                    self.log.warning(
+                        "Lock not available when deactivating deleted DAGs for 
bundle %s. "
+                        "Skipping this iteration to prevent processor hang.",
+                        bundle_name,
+                    )
+                    session.rollback()
+                else:
+                    raise
 
     def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
         """Occasionally print out stats about how fast the files are getting 
processed."""
diff --git a/airflow-core/src/airflow/utils/sqlalchemy.py 
b/airflow-core/src/airflow/utils/sqlalchemy.py
index 35a6f4ee05b..a9508712cbc 100644
--- a/airflow-core/src/airflow/utils/sqlalchemy.py
+++ b/airflow-core/src/airflow/utils/sqlalchemy.py
@@ -24,7 +24,7 @@ import logging
 from collections.abc import Generator
 from typing import TYPE_CHECKING, Any
 
-from sqlalchemy import TIMESTAMP, PickleType, String, event, nullsfirst
+from sqlalchemy import TIMESTAMP, PickleType, String, event, nullsfirst, text
 from sqlalchemy.dialects import mysql
 from sqlalchemy.dialects.postgresql import JSONB
 from sqlalchemy.ext.compiler import compiles
@@ -461,6 +461,48 @@ def lock_rows(query: Select, session: Session) -> 
Generator[None, None, None]:
     del locked_rows
 
 
[email protected]
+def with_db_lock_timeout(session: Session, lock_timeout: int = 30) -> 
Generator[None, None, None]:
+    """
+    Context manager to set the database lock timeout for the current session.
+
+    This prevents long-running operations from blocking indefinitely if they 
encounter
+    lock contention. Only supported on PostgreSQL and MySQL.
+
+    :param session: ORM Session
+    :param lock_timeout: Lock timeout in seconds.
+    """
+    if lock_timeout <= 0:
+        raise ValueError("lock_timeout must be a positive integer number of 
seconds")
+
+    try:
+        dialect_name = get_dialect_name(session)
+    except ValueError:
+        dialect_name = None
+
+    old_mysql_timeout = None
+
+    if dialect_name == "postgresql":
+        # SET LOCAL applies only to the current transaction and resets on 
COMMIT/ROLLBACK.
+        session.execute(text(f"SET LOCAL lock_timeout = '{lock_timeout}s'"))
+    elif dialect_name == "mysql":
+        old_mysql_timeout = session.execute(text("SELECT 
@@SESSION.innodb_lock_wait_timeout")).scalar()
+        session.execute(text(f"SET SESSION innodb_lock_wait_timeout = 
{lock_timeout}"))
+    else:
+        log.debug(
+            "Database lock timeout is not supported for dialect '%s'. "
+            "The requested timeout of %ss will not be applied.",
+            dialect_name,
+            lock_timeout,
+        )
+
+    try:
+        yield
+    finally:
+        if dialect_name == "mysql" and old_mysql_timeout is not None:
+            session.execute(text(f"SET SESSION innodb_lock_wait_timeout = 
{old_mysql_timeout}"))
+
+
 class CommitProhibitorGuard:
     """Context manager class that powers prohibit_commit."""
 
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 941d090f229..5b6fc1608e2 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -39,6 +39,7 @@ import msgspec
 import pytest
 import time_machine
 from sqlalchemy import func, select
+from sqlalchemy.exc import OperationalError
 from uuid6 import uuid7
 
 from airflow._shared.timezones import timezone
@@ -1040,6 +1041,61 @@ class TestDagFileProcessorManager:
         )
         assert is_stale_by_dag == {"dag_in_inactive_bundle": True, 
"dag_in_active_bundle": False}
 
+    @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error")
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_deactivate_stale_dags_handles_lock_timeout(self, 
mock_is_lock_not_available, session, caplog):
+        """Dags deactivation should gracefully handle database lock 
timeouts."""
+        from sqlalchemy.exc import OperationalError
+
+        session.add(DagBundleModel(name="gone-bundle"))
+        session.flush()
+        session.execute(
+            DagBundleModel.__table__.update().where(DagBundleModel.name == 
"gone-bundle").values(active=False)
+        )
+        session.add(
+            DagModel(
+                dag_id="dag_in_inactive_bundle",
+                bundle_name="gone-bundle",
+                relative_fileloc="some_file.py",
+                last_parsed_time=timezone.utcnow(),
+                is_stale=False,
+            )
+        )
+        session.flush()
+
+        manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 
60)
+
+        # Mock session.execute to raise OperationalError only when updating 
DagModel
+        original_execute = session.execute
+
+        def mock_execute(*args, **kwargs):
+            if hasattr(args[0], "table") and getattr(args[0].table, "name", 
"") == "dag":
+                raise OperationalError("Lock wait timeout exceeded", 
params={}, orig=Exception())
+            return original_execute(*args, **kwargs)
+
+        mock_is_lock_not_available.return_value = True
+
+        with mock.patch.object(session, "execute", side_effect=mock_execute):
+            manager.deactivate_stale_dags(last_parsed={})
+
+        assert "Lock not available when deactivating stale DAGs" in caplog.text
+
+    @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error")
+    @mock.patch("airflow.models.dag.DagModel.deactivate_deleted_dags")
+    def test_deactivate_deleted_dags_handles_lock_timeout(
+        self, mock_deactivate_deleted_dags, mock_is_lock_not_available, caplog
+    ):
+        """Dags deactivation of deleted dags should gracefully handle database 
lock timeouts."""
+        mock_deactivate_deleted_dags.side_effect = OperationalError(
+            "Lock wait timeout exceeded", params={}, orig=Exception()
+        )
+        mock_is_lock_not_available.return_value = True
+
+        manager = DagFileProcessorManager(max_runs=1)
+        manager.deactivate_deleted_dags(bundle_name="testing_bundle", 
present=set())
+
+        assert "Lock not available when deactivating deleted DAGs for bundle 
testing_bundle" in caplog.text
+
     @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
     def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
         manager = DagFileProcessorManager(max_runs=1)

Reply via email to