This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9c737f6d19 Resolve trigger assignment race condition (#27072)
9c737f6d19 is described below
commit 9c737f6d192ef864dd4cde89a0a90c53f5336566
Author: Pavan Sharma <[email protected]>
AuthorDate: Mon Oct 31 07:00:31 2022 +0530
Resolve trigger assignment race condition (#27072)
---
airflow/models/trigger.py | 21 +++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c91dde6066..57d2ac8f26 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -29,7 +29,7 @@ from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
from airflow.utils.state import State
@@ -196,15 +196,16 @@ class Trigger(Base):
# Find triggers who do NOT have an alive triggerer_id, and then assign
# up to `capacity` of those to us.
- trigger_ids_query = (
+ trigger_ids_query = with_row_locks(
session.query(cls.id)
- # notin_ doesn't find NULL rows
.filter(or_(cls.triggerer_id.is_(None),
cls.triggerer_id.notin_(alive_triggerer_ids)))
- .limit(capacity)
- .all()
- )
- session.query(cls).filter(cls.id.in_([i.id for i in
trigger_ids_query])).update(
- {cls.triggerer_id: triggerer_id},
- synchronize_session=False,
- )
+ .limit(capacity),
+ session,
+ skip_locked=True,
+ ).all()
+ if trigger_ids_query:
+ session.query(cls).filter(cls.id.in_([i.id for i in
trigger_ids_query])).update(
+ {cls.triggerer_id: triggerer_id},
+ synchronize_session=False,
+ )
session.commit()