This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c737f6d19 Resolve trigger assignment race condition (#27072)
9c737f6d19 is described below

commit 9c737f6d192ef864dd4cde89a0a90c53f5336566
Author: Pavan Sharma <[email protected]>
AuthorDate: Mon Oct 31 07:00:31 2022 +0530

    Resolve trigger assignment race condition (#27072)
---
 airflow/models/trigger.py | 21 +++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c91dde6066..57d2ac8f26 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -29,7 +29,7 @@ from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
 from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
 from airflow.utils.state import State
 
 
@@ -196,15 +196,16 @@ class Trigger(Base):
 
         # Find triggers who do NOT have an alive triggerer_id, and then assign
         # up to `capacity` of those to us.
-        trigger_ids_query = (
+        trigger_ids_query = with_row_locks(
             session.query(cls.id)
-            # notin_ doesn't find NULL rows
             .filter(or_(cls.triggerer_id.is_(None), 
cls.triggerer_id.notin_(alive_triggerer_ids)))
-            .limit(capacity)
-            .all()
-        )
-        session.query(cls).filter(cls.id.in_([i.id for i in 
trigger_ids_query])).update(
-            {cls.triggerer_id: triggerer_id},
-            synchronize_session=False,
-        )
+            .limit(capacity),
+            session,
+            skip_locked=True,
+        ).all()
+        if trigger_ids_query:
+            session.query(cls).filter(cls.id.in_([i.id for i in 
trigger_ids_query])).update(
+                {cls.triggerer_id: triggerer_id},
+                synchronize_session=False,
+            )
         session.commit()

Reply via email to