Prasanth345 commented on issue #53610:
URL: https://github.com/apache/airflow/issues/53610#issuecomment-3131880599
Hi @jscheffl
I've successfully reproduced the issue described here using a DAG that
employs a deferrable sensor (e.g., ExternalTaskSensor with mode="reschedule" or
a custom BaseSensorOperator using `.defer(...)`) in combination with the
EdgeExecutor. When such deferrable tasks are retried or rescheduled, the
scheduler attempts to insert a duplicate entry into the `edge_job` table,
violating the primary key constraint and causing an `IntegrityError`, which can
crash the scheduler.
### **Root Cause**
The issue occurs because both:
- `execute_async()` (called during deferred task scheduling), and
- `queue_workload()` (called by EdgeWorker when picking up a workload)
attempt to insert a new `EdgeJobModel` row with the same composite key
(`dag_id`, `task_id`, `run_id`, `map_index`, `try_number`), without checking if
a record already exists.
Since the composite key is already present (especially for
deferrable/rescheduled tasks that re-enter the queue), this results in a
**duplicate key violation** in Postgres.
### **Proposed Solution**
I have updated both `execute_async()` and `queue_workload()` to check
whether a job record already exists **before inserting**. This prevents
unintentional duplicate inserts and protects the scheduler from crashing.
File:
`airflow/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py`
### **Changes made:**
- In `execute_async()`:
Checked if a job already exists for the task key. If not, insert it.
Otherwise, skip.
```python
# Check if job already exists with same dag_id, task_id, run_id,
map_index, try_number
existing_job = session.query(EdgeJobModel).filter_by(
dag_id=key.dag_id,
task_id=key.task_id,
run_id=key.run_id,
map_index=key.map_index,
try_number=key.try_number,
).first()
if existing_job:
self.log.info(f"EdgeExecutor: Skipping duplicate insert for
{key}")
return
```
- In `queue_workload()`:
Same logic added to avoid blind inserts when EdgeWorker queues the
workload.
```python
# Check if job already exists with same dag_id, task_id, run_id,
map_index, try_number
existing_job = session.query(EdgeJobModel).filter_by(
dag_id=key.dag_id,
task_id=key.task_id,
run_id=key.run_id,
map_index=key.map_index,
try_number=key.try_number,
).first()
if existing_job:
self.log.info(f"EdgeExecutor: Skipping duplicate insert for
{key} in QueueWorkload.")
return
```
### **Rationale**
- This change avoids data corruption and scheduler failure without
changing existing behavior.
- The fix is minimal, localized, and aligns with existing Airflow
conventions (e.g., safe session use and idempotent task enqueuing).
- Alternative considered: **Using** **ON CONFLICT DO NOTHING**, but this
would silently drop logic when unexpected duplicates happen — explicit checking
gives better visibility and logging.
### **Testing**
Below is the DAG used to reliably reproduce the issue and verify the fix:
```python
from airflow import DAG
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.exceptions import AirflowSkipException
from airflow.utils.context import Context
from datetime import datetime, timedelta
class DummyTrigger(BaseTrigger):
def serialize(self):
return ("__main__.DummyTrigger", {})
async def run(self):
import asyncio
await asyncio.sleep(65) # Delays past timeout to simulate bug
yield TriggerEvent("done")
class CustomDeferrableSensor(BaseSensorOperator):
def execute(self, context: Context):
self.defer(trigger=DummyTrigger(), method_name="execute_complete",
timeout=60)
def execute_complete(self, context: Context, event=None):
if event == "done":
return "Success"
raise AirflowSkipException("Did not complete")
with DAG(
dag_id="example_deferrable_timeout",
start_date=datetime(2025, 7, 1),
catchup=False,
schedule=None,
) as dag:
task = CustomDeferrableSensor(
task_id="trigger_timeout_test",
retries=1,
retry_delay=timedelta(seconds=5),
)
````
- Used a custom DAG with a deferrable task
(`BaseSensorOperator.defer(...)`) that defers beyond a timeout to simulate
rescheduling.
- Verified that the task no longer crashes the scheduler on
retry/reschedule.
- Also verified that once fixed, the DAG proceeds correctly after
re-queuing and the deferred task is handled as expected.
Please let me know if this looks good. I’m happy to raise a PR with the
patch if we agree this is the right direction.
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]