Copilot commented on code in PR #68118:
URL: https://github.com/apache/airflow/pull/68118#discussion_r3367146269
##########
airflow-core/src/airflow/utils/sqlalchemy.py:
##########
@@ -461,6 +461,38 @@ def lock_rows(query: Select, session: Session) ->
Generator[None, None, None]:
del locked_rows
[email protected]
+def with_db_lock_timeout(session: Session, lock_timeout: int = 30) ->
Generator[None, None, None]:
+ """
+ Context manager to set the database lock timeout for the current session.
+
+ This prevents long-running operations from blocking indefinitely if they
encounter
+ lock contention. Only supported on PostgreSQL and MySQL.
+
+ :param session: ORM Session
+ :param lock_timeout: Lock timeout in seconds.
+ """
+ try:
+ dialect_name = get_dialect_name(session)
+ except ValueError:
+ dialect_name = None
+
+ old_mysql_timeout = None
Review Comment:
`with_db_lock_timeout` accepts `lock_timeout` but doesn’t validate it.
Passing `0` or a negative value can disable the timeout (or produce invalid
SQL), which undermines the intent of preventing indefinite hangs.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -1040,6 +1040,45 @@ def
test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, session):
)
assert is_stale_by_dag == {"dag_in_inactive_bundle": True,
"dag_in_active_bundle": False}
+ @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error")
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_deactivate_stale_dags_handles_lock_timeout(self,
mock_is_lock_not_available, session, caplog):
+ """Dags deactivation should gracefully handle database lock
timeouts."""
+ from sqlalchemy.exc import OperationalError
+
Review Comment:
The test adds an import inside the test function body. In this repo imports
are expected to be at module level (exceptions are for circular imports, lazy
loading for worker isolation, or TYPE_CHECKING blocks).
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -903,15 +920,28 @@ def deactivate_deleted_dags(self, bundle_name: str,
present: set[DagFileInfo]) -
"""Deactivate DAGs that come from files that are no longer present in
bundle."""
observed_filelocs = self._get_observed_filelocs(present)
with create_session() as session:
- any_deactivated = DagModel.deactivate_deleted_dags(
- bundle_name=bundle_name,
- rel_filelocs=observed_filelocs,
- session=session,
- )
- # Only run cleanup if we actually deactivated any DAGs
- # This avoids unnecessary DELETE queries in the common case where
no DAGs were deleted
- if any_deactivated:
- remove_references_to_deleted_dags(session=session)
+ try:
+ with with_db_lock_timeout(session=session, lock_timeout=30):
+ any_deactivated = DagModel.deactivate_deleted_dags(
+ bundle_name=bundle_name,
+ rel_filelocs=observed_filelocs,
+ session=session,
+ )
+ # Only run cleanup if we actually deactivated any DAGs
+ # This avoids unnecessary DELETE queries in the common
case where no DAGs were deleted
+ if any_deactivated:
+ remove_references_to_deleted_dags(session=session)
+ session.flush()
+ except OperationalError as e:
+ if is_lock_not_available_error(e):
+ self.log.warning(
+ "Lock not available when deactivating deleted DAGs for
bundle %s. "
+ "Skipping this iteration to prevent processor hang.",
+ bundle_name,
+ )
+ session.rollback()
+ else:
+ raise
Review Comment:
Lock-timeout handling was added to `deactivate_deleted_dags` (warning +
rollback + skip), but there’s no unit test asserting this behavior (unlike
`deactivate_stale_dags`). This path is important to prevent startup hangs and
should be covered with a regression test that forces `OperationalError` and
asserts rollback + warning log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]