Prasanth345 commented on issue #53610:
URL: https://github.com/apache/airflow/issues/53610#issuecomment-3131880599

   Hi @jscheffl 
   
   I've successfully reproduced the issue described here using a DAG that 
employs a deferrable sensor (e.g., ExternalTaskSensor with mode="reschedule" or 
a custom BaseSensorOperator using `.defer(...)`) in combination with the 
EdgeExecutor. When such deferrable tasks are retried or rescheduled, the 
scheduler attempts to insert a duplicate entry into the `edge_job` table, 
violating the primary key constraint and causing an `IntegrityError`, which can 
crash the scheduler.
   
   ### **Root Cause**
   
   The issue occurs because both:
   
   -   `execute_async()` (called during deferred task scheduling), and
   -   `queue_workload()` (called by EdgeWorker when picking up a workload)
   
   attempt to insert a new `EdgeJobModel` row with the same composite key 
(`dag_id`, `task_id`, `run_id`, `map_index`, `try_number`), without checking if 
a record already exists.
   
   Since the composite key is already present (especially for 
deferrable/rescheduled tasks that re-enter the queue), this results in a 
**duplicate key violation** in Postgres.
   
   ### **Proposed Solution**
   
   I have updated both `execute_async()` and `queue_workload()` to check 
whether a job record already exists **before inserting**. This prevents 
unintentional duplicate inserts and protects the scheduler from crashing.
   
   File: 
`airflow/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py`
   
   ### **Changes made:**
   
   -   In `execute_async()`:
       Checked if a job already exists for the task key. If not, insert it. 
Otherwise, skip.
       ```python
       # Check if job already exists with same dag_id, task_id, run_id, 
map_index, try_number
           existing_job = session.query(EdgeJobModel).filter_by(
               dag_id=key.dag_id,
               task_id=key.task_id,
               run_id=key.run_id,
               map_index=key.map_index,
               try_number=key.try_number,
           ).first()
   
           if existing_job:
               self.log.info(f"EdgeExecutor: Skipping duplicate insert for 
{key}")
               return
        ```
   
   -   In `queue_workload()`:
       Same logic added to avoid blind inserts when EdgeWorker queues the 
workload.
       ```python
       # Check if job already exists with same dag_id, task_id, run_id, 
map_index, try_number
           existing_job = session.query(EdgeJobModel).filter_by(
               dag_id=key.dag_id,
               task_id=key.task_id,
               run_id=key.run_id,
               map_index=key.map_index,
               try_number=key.try_number,
           ).first()
   
           if existing_job:
               self.log.info(f"EdgeExecutor: Skipping duplicate insert for 
{key} in QueueWorkload.")
               return
        ```
   
   ### **Rationale**
   
   -   This change avoids data corruption and scheduler failure without 
changing existing behavior.
   -   The fix is minimal, localized, and aligns with existing Airflow 
conventions (e.g., safe session use and idempotent task enqueuing).
   -   Alternative considered: **Using** **ON CONFLICT DO NOTHING**, but this 
would silently drop logic when unexpected duplicates happen — explicit checking 
gives better visibility and logging.
   
   ### **Testing**
   
   Below is the DAG used to reliably reproduce the issue and verify the fix:
   
   ```python
   from airflow import DAG
   from airflow.sensors.base import BaseSensorOperator
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   from airflow.exceptions import AirflowSkipException
   from airflow.utils.context import Context
   from datetime import datetime, timedelta
   
   class DummyTrigger(BaseTrigger):
       def serialize(self):
           return ("__main__.DummyTrigger", {})
   
       async def run(self):
           import asyncio
           await asyncio.sleep(65)  # Delays past timeout to simulate bug
           yield TriggerEvent("done")
   
   class CustomDeferrableSensor(BaseSensorOperator):
       def execute(self, context: Context):
           self.defer(trigger=DummyTrigger(), method_name="execute_complete", 
timeout=60)
   
       def execute_complete(self, context: Context, event=None):
           if event == "done":
               return "Success"
           raise AirflowSkipException("Did not complete")
   
   with DAG(
       dag_id="example_deferrable_timeout",
       start_date=datetime(2025, 7, 1),
       catchup=False,
       schedule=None,
   ) as dag:
       task = CustomDeferrableSensor(
           task_id="trigger_timeout_test",
           retries=1,
           retry_delay=timedelta(seconds=5),
       )
   
   ````
   
   -   Used a custom DAG with a deferrable task 
(`BaseSensorOperator.defer(...)`) that defers beyond a timeout to simulate 
rescheduling.
   -   Verified that the task no longer crashes the scheduler on 
retry/reschedule.
   -   Also verified that once fixed, the DAG proceeds correctly after 
re-queuing and the deferred task is handled as expected.
   
   
   
   Please let me know if this looks good. I’m happy to raise a PR with the 
patch if we agree this is the right direction.
   
   Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to