1fanwang opened a new issue, #66784:
URL: https://github.com/apache/airflow/issues/66784
### Apache Airflow version
main (3.x)
### What happened?
When a sensor runs in `mode="reschedule"`, the task instance's `start_date`
is overwritten on every poke instead of preserved at the first-poke value. The
supervisor sends `start_date=utcnow()` as part of the `ti_run` execution-API
payload on each re-execution, and the endpoint at
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py::ti_run`
writes that value through unconditionally. A guard already exists for deferred
tasks (lines 180-182: `if ti.next_kwargs: data.pop("start_date")`), but no
equivalent guard exists for rescheduled tasks.
There is a legacy guard in
`airflow-core/src/airflow/models/taskinstance.py::_check_and_change_state_before_execution`
at ~line 1315:
```python
ti.start_date = ti.start_date if ti.next_method else timezone.utcnow()
if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE:
tr_start_date = session.scalar(
TR.stmt_for_task_instance(ti,
descending=False).with_only_columns(TR.start_date).limit(1)
)
if tr_start_date:
ti.start_date = tr_start_date
```
That guard is gated on `ti.state == UP_FOR_RESCHEDULE`, which never holds at
the time the worker runs the check in the normal multi-scheduler flow: the
scheduler transitions `UP_FOR_RESCHEDULE → QUEUED` before the worker picks up
the task. By the time the worker calls
`_check_and_change_state_before_execution`, `ti.refresh_from_db()` returns
`QUEUED`, so the lookup is skipped and `start_date` is reset.
The net effect is that `dagrun.first_task_scheduling_delay` reports
near-zero for any DAG whose first task is a reschedule-mode sensor, because the
task's `start_date` reflects the last poke rather than the first — and the
metric measures `start_date - dag_run.queued_at`. A sensor that waited 30
minutes shows up as "started instantly."
### What you think should happen instead?
The `ti_run` endpoint should treat a rescheduled task the same way it treats
a deferred task: if the task has prior `TaskReschedule` rows, restore
`start_date` from the first one in the current TI lifecycle instead of
accepting the supervisor's `utcnow()`.
`prepare_db_for_next_try`
(`airflow-core/src/airflow/models/taskinstance.py:973-979`) deletes all
`TaskReschedule` rows for the previous try's `ti.id` and assigns a new UUID, so
rows with `ti_id == current task_instance_id` belong to the current try and do
not need additional try_number scoping.
### How to reproduce
```python
from airflow.providers.standard.sensors.python import PythonSensor
PythonSensor(
task_id="poll",
mode="reschedule",
poke_interval=10,
timeout=300,
python_callable=lambda: condition_eventually_true(),
)
```
Compare `ti.start_date` across pokes — it advances on every reschedule.
After the fix, it stays pinned at the first poke.
### Operating System
n/a (logic bug)
### Versions of Apache Airflow Providers
n/a
### Deployment
Other
### Deployment details
Reproducible on `main`.
### Are you willing to submit PR?
Yes, willing to submit a PR.
### Code of Conduct
- [X] I agree to follow this project's Code of Conduct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]