bujjibabukatta opened a new pull request, #68061:
URL: https://github.com/apache/airflow/pull/68061
When running 2+ schedulers (HA mode), DAGs using `PartitionedAssetTimetable`
or `CronPartitionTimetable` intermittently produce duplicate DagRuns for the
same asset event.
PR #60773 fixed this race for non-partitioned asset scheduling by adding
`with_row_locks` to the `AssetDagRunQueue` fetch. However, partitioned assets
flow through a separate code path —
`_create_dagruns_for_partitioned_asset_dags`
— which reads `AssetPartitionDagRun` rows with a plain `SELECT`, no lock.
Two schedulers can read the same unprocessed rows within ~40ms of each other
and independently create a DagRun for each.
Confirmed reproducible on MWAA 3.2.1 (which ships #60773) with 2 schedulers
and 2+ consumer DAGs on the same asset.
## Fix
Add `with_row_locks(..., skip_locked=True)` to the `AssetPartitionDagRun`
query in `_create_dagruns_for_partitioned_asset_dags`, mirroring exactly what
#60773 did for `AssetDagRunQueue`.
`SELECT ... FOR UPDATE SKIP LOCKED` ensures that when Scheduler A locks the
APDR rows it is processing, Scheduler B's identical query returns zero rows
and exits cleanly — no duplicate DagRun is created.
`with_row_locks` is already imported in this file so no new imports are
needed.
## Testing
- Reproduce with 2 schedulers + 2 consumer DAGs on a
`PartitionedAssetTimetable`
asset (steps from #68045)
- Verify duplicate run IDs no longer appear after the patch
- Existing scheduler unit tests should continue to pass
## Related
- Closes #68045
- Mirrors fix from #60773 (non-partitioned path)
- Related: #59183, #61831, #61433, #62441
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]