1fanwang opened a new issue, #66784:
URL: https://github.com/apache/airflow/issues/66784

   ### Apache Airflow version
   
   main (3.x)
   
   ### What happened?
   
   When a sensor runs in `mode="reschedule"`, the task instance's `start_date` 
is overwritten on every poke instead of preserved at the first-poke value. The 
supervisor sends `start_date=utcnow()` as part of the `ti_run` execution-API 
payload on each re-execution, and the endpoint at 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py::ti_run`
 writes that value through unconditionally. A guard already exists for deferred 
tasks (lines 180-182: `if ti.next_kwargs: data.pop("start_date")`), but no 
equivalent guard exists for rescheduled tasks.
   
   There is a legacy guard in 
`airflow-core/src/airflow/models/taskinstance.py::_check_and_change_state_before_execution`
 at ~line 1315:
   
   ```python
   ti.start_date = ti.start_date if ti.next_method else timezone.utcnow()
   if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE:
       tr_start_date = session.scalar(
           TR.stmt_for_task_instance(ti, 
descending=False).with_only_columns(TR.start_date).limit(1)
       )
       if tr_start_date:
           ti.start_date = tr_start_date
   ```
   
   That guard is gated on `ti.state == UP_FOR_RESCHEDULE`, which never holds at 
the time the worker runs the check in the normal multi-scheduler flow: the 
scheduler transitions `UP_FOR_RESCHEDULE → QUEUED` before the worker picks up 
the task. By the time the worker calls 
`_check_and_change_state_before_execution`, `ti.refresh_from_db()` returns 
`QUEUED`, so the lookup is skipped and `start_date` is reset.
   
   The net effect is that `dagrun.first_task_scheduling_delay` reports 
near-zero for any DAG whose first task is a reschedule-mode sensor, because the 
task's `start_date` reflects the last poke rather than the first — and the 
metric measures `start_date - dag_run.queued_at`. A sensor that waited 30 
minutes shows up as "started instantly."
   
   ### What you think should happen instead?
   
   The `ti_run` endpoint should treat a rescheduled task the same way it treats 
a deferred task: if the task has prior `TaskReschedule` rows, restore 
`start_date` from the first one in the current TI lifecycle instead of 
accepting the supervisor's `utcnow()`.
   
   `prepare_db_for_next_try` 
(`airflow-core/src/airflow/models/taskinstance.py:973-979`) deletes all 
`TaskReschedule` rows for the previous try's `ti.id` and assigns a new UUID, so 
rows with `ti_id == current task_instance_id` belong to the current try and do 
not need additional try_number scoping.
   
   ### How to reproduce
   
   ```python
   from airflow.providers.standard.sensors.python import PythonSensor
   
   PythonSensor(
       task_id="poll",
       mode="reschedule",
       poke_interval=10,
       timeout=300,
       python_callable=lambda: condition_eventually_true(),
   )
   ```
   
   Compare `ti.start_date` across pokes — it advances on every reschedule. 
After the fix, it stays pinned at the first poke.
   
   ### Operating System
   
   n/a (logic bug)
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Reproducible on `main`.
   
   ### Are you willing to submit PR?
   
   Yes, willing to submit a PR.
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's Code of Conduct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to