hkc-8010 opened a new issue, #64710:
URL: https://github.com/apache/airflow/issues/64710

   ## Description
   
   When Airflow is running with multiple scheduler replicas (the default on 
Astronomer Hosted, where 2 scheduler replicas always run), the deadline check 
loop in `scheduler_job_runner.py` has a **race condition** that causes 
duplicate `CallbackTrigger` entries to be created for the same missed deadline. 
This results in the deadline breach callback firing **multiple times** — 
producing duplicate alerts/notifications.
   
   With a single triggerer replica the duplicates may go unnoticed (they fire 
in rapid succession from the same pod). With **two or more triggerer 
replicas**, each triggerer pod claims one of the duplicate triggers and fires 
its callback independently, making the problem clearly visible to users.
   
   ---
   
   ## Steps to Reproduce
   
   1. Use Airflow 3 with a DAG that defines a `deadline` using `DeadlineAlert` 
and a callback (e.g. `AsyncCallback`):
   
      ```python
      from datetime import timedelta
      from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference
      from airflow.sdk import AsyncCallback
   
      with DAG(
          dag_id="crm-braze-cdi-daily",
          schedule="@daily",
          deadline=DeadlineAlert(
              reference=DeadlineReference.DAGRUN_QUEUED_AT,
              interval=timedelta(hours=1),
              
callback=AsyncCallback(path="my_package.callbacks.on_deadline_breach"),
          ),
      ) as dag:
          ...
      ```
   
   2. Run Airflow with **2 scheduler replicas** (the default on Astro Hosted).
   3. Run Airflow with **2 triggerer replicas** (e.g. 
`ASTRO_TRIGGERER_REPLICAS=2`).
   4. Allow a DAG run to exceed its deadline.
   
   **Expected:** The deadline breach callback fires **once**.
   
   **Actual:** The deadline breach callback fires **twice** — once per 
triggerer replica.
   
   ---
   
   ## Root Cause Analysis
   
   ### 1. The scheduler deadline check loop lacks row-level locking
   
   In `scheduler_job_runner.py`, the scheduling loop checks for missed 
deadlines on every iteration:
   
   ```python
   # airflow/jobs/scheduler_job_runner.py, lines 1626-1635
   with create_session() as session:
       # Only retrieve expired deadlines that haven't been processed yet.
       # `missed` is False by default until the handler sets it.
       for deadline in session.scalars(
           select(Deadline)
           .where(Deadline.deadline_time < datetime.now(timezone.utc))
           .where(~Deadline.missed)                      # ← NO SELECT FOR 
UPDATE
           .options(selectinload(Deadline.callback), 
selectinload(Deadline.dagrun))
       ):
           deadline.handle_miss(session)
   ```
   
   There is **no `with_for_update()`** (pessimistic locking) on this query. 
Under PostgreSQL's default `READ COMMITTED` isolation, two scheduler replicas 
executing this query concurrently can **both read the same `Deadline` row** 
with `missed=False` before either has committed its transaction.
   
   ### 2. `handle_miss()` creates a new `Trigger` record each time it is called
   
   ```python
   # airflow/models/deadline.py, lines 216-264
   def handle_miss(self, session: Session):
       ...
       if isinstance(self.callback, TriggererCallback):
           self.callback.queue()          # ← creates a NEW Trigger row in the 
DB
           session.add(self.callback)
           session.flush()
       ...
       self.missed = True
       session.add(self)
   ```
   
   `TriggererCallback.queue()` calls 
`Trigger.from_object(CallbackTrigger(...))`, inserting a **new row** into the 
`trigger` table:
   
   ```python
   # airflow/models/callback.py, lines 214-224
   def queue(self):
       from airflow.models.trigger import Trigger
       from airflow.triggers.callback import CallbackTrigger
   
       self.trigger = Trigger.from_object(
           CallbackTrigger(
               callback_path=self.data["path"],
               callback_kwargs=self.data["kwargs"],
           )
       )
       super().queue()
   ```
   
   ### 3. Race condition sequence (2 scheduler replicas)
   
   ```
   Time →     Scheduler A                          Scheduler B
   T1         reads Deadline X (missed=False)
   T1                                              reads Deadline X 
(missed=False)
   T2         calls handle_miss() → Trigger 1 created, missed=True, COMMIT
   T3                                              calls handle_miss() → 
Trigger 2 created, missed=True, COMMIT
   ```
   
   At T3, Scheduler B's session still holds the stale `missed=False` value from 
T1 (READ COMMITTED doesn't re-read within the same transaction). B's update of 
`missed=True` succeeds silently (idempotent), but **Trigger 2 already exists in 
the database**.
   
   ### 4. Two triggerer replicas pick up one trigger each
   
   `Trigger.assign_unassigned()` distributes unassigned triggers across 
available triggerer instances. With 2 triggerer replicas, each pod claims one 
of the two `CallbackTrigger` rows and executes the deadline breach callback 
independently → **2 notifications sent**.
   
   With only **1 triggerer replica**, both triggers are picked up by the same 
pod and fire back-to-back. This is still a bug (fires twice), but it is much 
harder to notice because the notifications arrive nearly simultaneously.
   
   ---
   
   ## Proposed Fix
   
   Add `.with_for_update(skip_locked=True)` to the deadline query in the 
scheduler loop. This ensures that when Scheduler A is processing a deadline, 
Scheduler B will skip it (rather than both processing it simultaneously):
   
   ```python
   # scheduler_job_runner.py
   with create_session() as session:
       for deadline in session.scalars(
           select(Deadline)
           .where(Deadline.deadline_time < datetime.now(timezone.utc))
           .where(~Deadline.missed)
           .with_for_update(skip_locked=True)            # ← ADD THIS
           .options(selectinload(Deadline.callback), 
selectinload(Deadline.dagrun))
       ):
           deadline.handle_miss(session)
   ```
   
   `skip_locked=True` is preferred over `nowait=True` because it prevents the 
second scheduler from erroring out — it simply skips the row that is already 
locked by the first scheduler, which is the correct behaviour.
   
   Alternatively, a **unique constraint** on `(dagrun_id, deadline_alert_id)` 
in the `deadline` table combined with an upsert pattern would also prevent the 
duplicate trigger creation, but the `SELECT FOR UPDATE / SKIP LOCKED` approach 
is lower risk and more consistent with how similar patterns are handled 
elsewhere in Airflow (e.g., trigger assignment).
   
   ---
   
   ## Impact
   
   | Scenario | Behaviour |
   |---|---|
   | 1 scheduler, 1 triggerer | No visible duplicate (correct) |
   | 2 schedulers, 1 triggerer | 2 triggers created; same pod fires callback 
twice (may be invisible) |
   | 2 schedulers, 2 triggerers | 2 triggers created; each pod fires callback 
once → **user sees 2 notifications** |
   
   The bug is latent whenever multiple scheduler replicas run (the default on 
Astronomer Hosted and most HA Airflow deployments). Adding a second triggerer 
makes it clearly visible because the two duplicate triggers are now distributed 
across two independent pods.
   
   ---
   
   ## Environment
   
   - **Airflow version:** 3.0.x / 3.1.x (confirmed in `apache/airflow` `main` 
branch as of 2026-04-02)
   - **Database:** PostgreSQL (READ COMMITTED isolation level)
   - **Scheduler replicas:** 2 (standard Astro Hosted configuration)
   - **Triggerer replicas:** 2 (set via `ASTRO_TRIGGERER_REPLICAS=2`)
   - **Executor:** AstroExecutor
   
   ---
   
   ## Related Issues / PRs
   
   - [apache/airflow#64620](https://github.com/apache/airflow/issues/64620) — 
Triggerer `TriggerCommsDecoder` `asyncio.Lock` race condition (separate but 
related triggerer stability issue on the same deployment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to