Mercury2699 opened a new issue, #68045:
URL: https://github.com/apache/airflow/issues/68045
# Duplicate DAG runs for PartitionedAssetTimetable with multiple schedulers
(HA) — race not covered by #60773
## Apache Airflow version
3.2.1
## What happened?
Asset-triggered DAGs using `PartitionedAssetTimetable` or
`CronPartitionTimetable` are intermittently triggered twice for the same asset
event when running with 2 schedulers (HA mode).
PR #60773 ("Add row lock to ADRQ before Dag run creation") fixed this race
for **non-partitioned** asset scheduling by adding `with_row_locks` on
`AssetDagRunQueue` reads. However, the **partitioned asset** code path in
`scheduler_job_runner.py` appears to have a separate or parallel
DagRun-creation flow that is not protected by the same row lock. The result is
that two schedulers can both read the same unprocessed partitioned asset event
and each independently create a DagRun.
This is confirmed reproducible on MWAA 3.2.1 (image
`2026_05_12_23_43_Airflow_Image_3_2_1`) which includes #60773 (shipped in
3.2.0).
## What you think should happen instead?
A single partitioned asset event should trigger exactly one DAG run,
regardless of how many schedulers are running.
The partitioned-asset scheduling path should receive the same row-lock
treatment that #60773 applied to the non-partitioned path.
## How to reproduce
### Prerequisites
- Airflow 3.2.1 with **2 schedulers** (HA mode)
- PostgreSQL metadata database
### DAGs
**Producer** — emits a partitioned asset event every 5 minutes via
`CronPartitionTimetable`:
```python
import time
from datetime import datetime, timedelta
import pendulum
from airflow.sdk import DAG, Asset, CronPartitionTimetable
from airflow.providers.standard.operators.python import PythonOperator
ASSET_SHARED = Asset("test_v7b_shared_events")
def simulate_processing(**context):
dag_run = context["dag_run"]
partition_key = getattr(dag_run, "partition_key", None) or
getattr(dag_run, "logical_date", None)
print(f"Processing partition: {partition_key}")
time.sleep(10)
with DAG(
dag_id="test_partitioned_asset_producer",
start_date=pendulum.now("UTC").subtract(hours=2),
schedule=CronPartitionTimetable("*/5 * * * *", timezone="UTC"),
catchup=False,
max_active_runs=1,
tags=["repro", "partitioned_asset_race"],
) as dag:
PythonOperator(
task_id="process_data",
python_callable=simulate_processing,
outlets=[ASSET_SHARED],
)
```
**Consumer Alpha** — uses `PartitionedAssetTimetable`:
```python
import time
from datetime import datetime
from airflow.sdk import DAG, Asset, PartitionedAssetTimetable
from airflow.providers.standard.operators.python import PythonOperator
ASSET_SHARED = Asset("test_v7b_shared_events")
def process_event(**context):
print(f"Consumer ALPHA - Run ID: {context['dag_run'].run_id}")
time.sleep(5)
with DAG(
dag_id="test_partitioned_asset_consumer_alpha",
start_date=datetime(2024, 1, 1),
schedule=PartitionedAssetTimetable(assets=ASSET_SHARED),
catchup=False,
tags=["repro", "partitioned_asset_race", "consumer"],
) as dag:
PythonOperator(task_id="process_event", python_callable=process_event)
```
**Consumer Beta** — identical schedule, different DAG ID:
```python
import time
from datetime import datetime
from airflow.sdk import DAG, Asset, PartitionedAssetTimetable
from airflow.providers.standard.operators.python import PythonOperator
ASSET_SHARED = Asset("test_v7b_shared_events")
def process_event(**context):
print(f"Consumer BETA - Run ID: {context['dag_run'].run_id}")
time.sleep(5)
with DAG(
dag_id="test_partitioned_asset_consumer_beta",
start_date=datetime(2024, 1, 1),
schedule=PartitionedAssetTimetable(assets=ASSET_SHARED),
catchup=False,
tags=["repro", "partitioned_asset_race", "consumer"],
) as dag:
PythonOperator(task_id="process_event", python_callable=process_event)
```
### Steps
1. Deploy all 3 DAGs with **2 schedulers** (HA).
2. Unpause all DAGs.
3. Let run for 1+ hours.
4. Check consumer DAG run history for duplicates.
### Key conditions
- Must have **2+ schedulers** (HA mode)
- Must have **2+ consumer DAGs** on the same asset (single consumer does NOT
trigger the bug)
- Must use **`PartitionedAssetTimetable`** — plain `Asset` schedule
(non-partitioned) does NOT reproduce (covered by #60773)
- The issue is intermittent (~40ms race window between schedulers)
### Evidence
Duplicate run IDs from production reproduction (timestamps identical, hashes
differ):
```
asset_triggered__2026-06-02T10:30:14.149756+00:00_EMV7bEJl
asset_triggered__2026-06-02T10:30:14.265600+00:00_ePLMw1Xd
```
Scheduler logs show both `job_id=3` and `job_id=6` independently creating
consumer runs within 40ms of each other from the same producer completion event.
## Root cause analysis
PR #60773 added `with_row_locks` to the ADRQ fetch in
`_create_dag_runs_asset_triggered`. However, when `PartitionedAssetTimetable`
is used, the DagRun creation appears to flow through a different code path
(likely involving `AssetPartitionDagRun` / partition-specific logic) that does
not acquire the same exclusive lock before creating the run.
The fix should extend the row-lock strategy from #60773 to cover the
partitioned-asset scheduling path in `scheduler_job_runner.py`.
## Related issues
- #63507 — Duplicate DAG runs for asset-triggered scheduling (closed by
#60773, non-partitioned)
- #54491 — Original report (Airflow 3.0.4, non-partitioned)
- #60773 — The fix PR (row lock on ADRQ, merged Mar 17 2026)
- #59183, #61831, #61433, #62441 — Related partition concurrency fixes in
3.2.0/3.2.1
## Are you willing to submit PR?
- Yes I am willing to submit a PR!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]