kaxil commented on code in PR #67867:
URL: https://github.com/apache/airflow/pull/67867#discussion_r3391451294
##########
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -166,6 +166,7 @@ class TIRescheduleStatePayload(StrictBaseModel):
]
reschedule_date: UtcDateTime
end_date: UtcDateTime
+ rendered_map_index: str | None = None
Review Comment:
The PR description lists an `AddRenderedMapIndexToReschedulePayload`
migration in `v2026_06_30.py`, but I don't see it in the diff. New payload
fields need a Cadwyn `VersionChange` so older API version schemas stay accurate
-- `AddRenderedMapIndexField` in `v2025_04_28.py` is the precedent for exactly
this field on the other payloads. Since 2026-06-30 isn't released yet,
`schema(TIRescheduleStatePayload).field("rendered_map_index").didnt_exist` in
that file should do it. (Static checks being green doesn't cover this btw: the
`check-execution-api-versions` prek hook skips the schema diff when run with
`--all-files`.)
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1513,7 +1513,9 @@ def _on_term(signum, frame):
log.info("::group::Post Execute")
log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
msg = RescheduleTask(
- reschedule_date=reschedule.reschedule_date,
end_date=datetime.now(tz=timezone.utc)
+ reschedule_date=reschedule.reschedule_date,
+ end_date=datetime.now(tz=timezone.utc),
+ rendered_map_index=ti.rendered_map_index,
Review Comment:
Worth stating in the description: on main this doesn't change what users
see. #57208 (shipped in 3.2.0) already persists the label on this path: the
`except Exception` block above re-renders and sends `SetRenderedMapIndex`
(`AirflowRescheduleException` is an `Exception`), and the supervisor writes it
to the DB before the reschedule call goes out. That's why the issue reproduces
on your 3.1.2 build but not on main, as kevinhongzl found. Carrying the value
in the reschedule payload itself is still reasonable for consistency with the
other state payloads, but #67521 as reported only affects 3.0.x-3.1.x, so
actually fixing affected users is a backport question rather than this merge.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -722,7 +722,10 @@ def _create_ti_state_update_query_and_update_state(
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
# clear the next_method and next_kwargs so that none of the retries
pick them up
updated_state = TaskInstanceState.UP_FOR_RESCHEDULE
- query = query.values(state=updated_state, next_method=None,
next_kwargs=None)
+ reschedule_values: dict[str, Any] = {"state": updated_state,
"next_method": None, "next_kwargs": None}
+ if ti_patch_payload.rendered_map_index is not None:
Review Comment:
Nothing exercises this write. Can you extend
`test_ti_update_state_to_reschedule` in
`airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py`
to send `rendered_map_index` in the payload and assert it lands on the TI? The
new SDK test only checks the message construction on the worker side.
##########
task-sdk/tests/task_sdk/bases/test_sensor.py:
##########
@@ -310,6 +310,26 @@ def test_ok_with_custom_reschedule_exception(self,
make_sensor, run_task):
state, _, _ = run_task(sensor)
assert state == TaskInstanceState.SUCCESS
+ def test_reschedule_includes_rendered_map_index(self, run_task,
make_sensor, time_machine):
+ """Test that RescheduleTask message includes rendered_map_index when
map_index_template is set."""
+ sensor = make_sensor(
+ return_value=None,
+ poke_interval=10,
+ timeout=25,
+ mode="reschedule",
+ map_index_template="{{ task.task_id }}",
+ )
+ sensor.poke = Mock(return_value=False)
+
+ date1 = timezone.utcnow()
+ time_machine.move_to(date1, tick=False)
Review Comment:
`date1` and the time freeze aren't used by any assertion. Either assert
`msg.end_date == date1` (the freeze makes that deterministic) or drop these two
lines and the `time_machine` fixture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]