dstandish commented on code in PR #27072:
URL: https://github.com/apache/airflow/pull/27072#discussion_r996358492
##########
airflow/models/trigger.py:
##########
@@ -201,10 +201,12 @@ def assign_unassigned(cls, triggerer_id, capacity,
session=None):
# notin_ doesn't find NULL rows
.filter(or_(cls.triggerer_id.is_(None),
cls.triggerer_id.notin_(alive_triggerer_ids)))
.limit(capacity)
+ .with_for_update(skip_locked=True)
.all()
)
- session.query(cls).filter(cls.id.in_([i.id for i in
trigger_ids_query])).update(
- {cls.triggerer_id: triggerer_id},
- synchronize_session=False,
- )
+ if trigger_ids_query:
+ session.query(cls).filter(cls.id.in_([i.id for i in
trigger_ids_query])).update(
+ {cls.triggerer_id: triggerer_id},
+ synchronize_session=False,
+ )
Review Comment:
I guess since we're trying to limit concurrency, we would need to do `update
... limit` and on postgres this is only achievable with subquery / CTE and ...
at least judging by this [stackoverflow
post](https://dba.stackexchange.com/a/69497), we would still need to do for
update / skip locked.... although it would still probably be beneficial to
avoid the round trip.
@andrewgodwin any concern for you with adding locking on the trigger table?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]