hkc-8010 opened a new issue, #64710:
URL: https://github.com/apache/airflow/issues/64710
## Description
When Airflow is running with multiple scheduler replicas (the default on
Astronomer Hosted, where 2 scheduler replicas always run), the deadline check
loop in `scheduler_job_runner.py` has a **race condition** that causes
duplicate `CallbackTrigger` entries to be created for the same missed deadline.
This results in the deadline breach callback firing **multiple times** —
producing duplicate alerts/notifications.
With a single triggerer replica the duplicates may go unnoticed (they fire
in rapid succession from the same pod). With **two or more triggerer
replicas**, each triggerer pod claims one of the duplicate triggers and fires
its callback independently, making the problem clearly visible to users.
---
## Steps to Reproduce
1. Use Airflow 3 with a DAG that defines a `deadline` using `DeadlineAlert`
and a callback (e.g. `AsyncCallback`):
```python
from datetime import timedelta
from airflow.sdk.definitions.deadline import DeadlineAlert,
DeadlineReference
from airflow.sdk import AsyncCallback
with DAG(
dag_id="crm-braze-cdi-daily",
schedule="@daily",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(hours=1),
callback=AsyncCallback(path="my_package.callbacks.on_deadline_breach"),
),
) as dag:
...
```
2. Run Airflow with **2 scheduler replicas** (the default on Astro Hosted).
3. Run Airflow with **2 triggerer replicas** (e.g.
`ASTRO_TRIGGERER_REPLICAS=2`).
4. Allow a DAG run to exceed its deadline.
**Expected:** The deadline breach callback fires **once**.
**Actual:** The deadline breach callback fires **twice** — once per
triggerer replica.
---
## Root Cause Analysis
### 1. The scheduler deadline check loop lacks row-level locking
In `scheduler_job_runner.py`, the scheduling loop checks for missed
deadlines on every iteration:
```python
# airflow/jobs/scheduler_job_runner.py, lines 1626-1635
with create_session() as session:
# Only retrieve expired deadlines that haven't been processed yet.
# `missed` is False by default until the handler sets it.
for deadline in session.scalars(
select(Deadline)
.where(Deadline.deadline_time < datetime.now(timezone.utc))
.where(~Deadline.missed) # ← NO SELECT FOR
UPDATE
.options(selectinload(Deadline.callback),
selectinload(Deadline.dagrun))
):
deadline.handle_miss(session)
```
There is **no `with_for_update()`** (pessimistic locking) on this query.
Under PostgreSQL's default `READ COMMITTED` isolation, two scheduler replicas
executing this query concurrently can **both read the same `Deadline` row**
with `missed=False` before either has committed its transaction.
### 2. `handle_miss()` creates a new `Trigger` record each time it is called
```python
# airflow/models/deadline.py, lines 216-264
def handle_miss(self, session: Session):
...
if isinstance(self.callback, TriggererCallback):
self.callback.queue() # ← creates a NEW Trigger row in the
DB
session.add(self.callback)
session.flush()
...
self.missed = True
session.add(self)
```
`TriggererCallback.queue()` calls
`Trigger.from_object(CallbackTrigger(...))`, inserting a **new row** into the
`trigger` table:
```python
# airflow/models/callback.py, lines 214-224
def queue(self):
from airflow.models.trigger import Trigger
from airflow.triggers.callback import CallbackTrigger
self.trigger = Trigger.from_object(
CallbackTrigger(
callback_path=self.data["path"],
callback_kwargs=self.data["kwargs"],
)
)
super().queue()
```
### 3. Race condition sequence (2 scheduler replicas)
```
Time → Scheduler A Scheduler B
T1 reads Deadline X (missed=False)
T1 reads Deadline X
(missed=False)
T2 calls handle_miss() → Trigger 1 created, missed=True, COMMIT
T3 calls handle_miss() →
Trigger 2 created, missed=True, COMMIT
```
At T3, Scheduler B's session still holds the stale `missed=False` value from
T1 (READ COMMITTED doesn't re-read within the same transaction). B's update of
`missed=True` succeeds silently (idempotent), but **Trigger 2 already exists in
the database**.
### 4. Two triggerer replicas pick up one trigger each
`Trigger.assign_unassigned()` distributes unassigned triggers across
available triggerer instances. With 2 triggerer replicas, each pod claims one
of the two `CallbackTrigger` rows and executes the deadline breach callback
independently → **2 notifications sent**.
With only **1 triggerer replica**, both triggers are picked up by the same
pod and fire back-to-back. This is still a bug (fires twice), but it is much
harder to notice because the notifications arrive nearly simultaneously.
---
## Proposed Fix
Add `.with_for_update(skip_locked=True)` to the deadline query in the
scheduler loop. This ensures that when Scheduler A is processing a deadline,
Scheduler B will skip it (rather than both processing it simultaneously):
```python
# scheduler_job_runner.py
with create_session() as session:
for deadline in session.scalars(
select(Deadline)
.where(Deadline.deadline_time < datetime.now(timezone.utc))
.where(~Deadline.missed)
.with_for_update(skip_locked=True) # ← ADD THIS
.options(selectinload(Deadline.callback),
selectinload(Deadline.dagrun))
):
deadline.handle_miss(session)
```
`skip_locked=True` is preferred over `nowait=True` because it prevents the
second scheduler from erroring out — it simply skips the row that is already
locked by the first scheduler, which is the correct behaviour.
Alternatively, a **unique constraint** on `(dagrun_id, deadline_alert_id)`
in the `deadline` table combined with an upsert pattern would also prevent the
duplicate trigger creation, but the `SELECT FOR UPDATE / SKIP LOCKED` approach
is lower risk and more consistent with how similar patterns are handled
elsewhere in Airflow (e.g., trigger assignment).
---
## Impact
| Scenario | Behaviour |
|---|---|
| 1 scheduler, 1 triggerer | No visible duplicate (correct) |
| 2 schedulers, 1 triggerer | 2 triggers created; same pod fires callback
twice (may be invisible) |
| 2 schedulers, 2 triggerers | 2 triggers created; each pod fires callback
once → **user sees 2 notifications** |
The bug is latent whenever multiple scheduler replicas run (the default on
Astronomer Hosted and most HA Airflow deployments). Adding a second triggerer
makes it clearly visible because the two duplicate triggers are now distributed
across two independent pods.
---
## Environment
- **Airflow version:** 3.0.x / 3.1.x (confirmed in `apache/airflow` `main`
branch as of 2026-04-02)
- **Database:** PostgreSQL (READ COMMITTED isolation level)
- **Scheduler replicas:** 2 (standard Astro Hosted configuration)
- **Triggerer replicas:** 2 (set via `ASTRO_TRIGGERER_REPLICAS=2`)
- **Executor:** AstroExecutor
---
## Related Issues / PRs
- [apache/airflow#64620](https://github.com/apache/airflow/issues/64620) —
Triggerer `TriggerCommsDecoder` `asyncio.Lock` race condition (separate but
related triggerer stability issue on the same deployment)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]