MageeStorm opened a new issue, #67521:
URL: https://github.com/apache/airflow/issues/67521

   ### Under which category would you file this issue?
   
   Providers
   
   ### Apache Airflow version
   
   3.0.1 -  3.1.2
   
   ### What happened and how to reproduce it?
   
   ### Apache Airflow version
   
   3.0.1 (also reproduced against task-sdk 1.0.6 through 1.2.1 and airflow-core 
3.1.0)
   
   ### What happened?
   
   When a `@task.sensor(mode="reschedule")` task sets 
`context["map_index_template"]` inside its body to give each mapped instance a 
human-readable name, the Airflow UI in Airflow 3 displays the integer 
`map_index` (`0`, `1`, `2`, …) on every rescheduled attempt instead of the 
rendered name. On the final poke (when the sensor returns `is_done=True`), the 
rendered name *does* appear, but every interim row in the UI shows the bare 
integer.
   
   The same DAG renders correctly on Airflow 2.
   
   ### What you think should happen instead?
   
   Each `up_for_reschedule` task instance row should display the rendered 
`map_index_template` value, matching Airflow 2 behavior and matching what other 
terminal/intermediate states (`success`, `failed`, `skipped`, `up_for_retry`) 
already do in Airflow 3.
   
   ### How to reproduce
   
   ```python
   from airflow.sdk import task
   
   @task
   def fan_out() -> list[dict]:
       return [{"id": "alpha"}, {"id": "beta"}, {"id": "gamma"}]
   
   @task.sensor(
       poke_interval=30,
       timeout=600,
       mode="reschedule",
       map_index_template="poll_{{ map_index_template }}",
   )
   def poll(item: dict):
       from airflow.sdk import get_current_context
       from airflow.sdk.bases.sensor import PokeReturnValue
       ctx = get_current_context()
       ctx["map_index_template"] = f"poll_{item['id']}"
       return PokeReturnValue(is_done=False)   # forces reschedule
   
   poll.expand(item=fan_out())
   ```
   
   Open the UI: while the sensor is `up_for_reschedule`, the map index column 
shows `0`, `1`, `2` instead of `poll_alpha`, `poll_beta`, `poll_gamma`.
   
   ### Root cause
   
   Three coordinated pieces in airflow-core + task-sdk all skip 
`rendered_map_index` on the reschedule path:
   
   1. **SDK doesn't render or attach the name.** In 
`airflow/sdk/execution_time/task_runner.py::run()`:
   
      ```python
      except AirflowRescheduleException as reschedule:
          log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
          msg = RescheduleTask(
              reschedule_date=reschedule.reschedule_date,
              end_date=datetime.now(tz=timezone.utc),
          )
          state = TaskInstanceState.UP_FOR_RESCHEDULE
      ```
   
      No call to `_render_map_index(context, ti=ti, log=log)`. Compare with the 
SUCCESS, FAILED, SKIPPED, and RETRY branches in the same function — all of them 
call `_render_map_index` and propagate 
`rendered_map_index=ti.rendered_map_index` to the supervisor.
   
   2. **IPC message has no slot.** `RescheduleTask` extends 
`TIRescheduleStatePayload`, which is the only state-transition payload missing 
the field:
   
      ```python
      # airflow/api_fastapi/execution_api/datamodels/taskinstance.py
      class TIRescheduleStatePayload(StrictBaseModel):
          state: ...
          reschedule_date: UtcDateTime
          end_date: UtcDateTime
          # no rendered_map_index
      ```
   
      Other payloads (`TITerminalStatePayload`, `TISuccessStatePayload`, 
`TIDeferredStatePayload`, `TIRetryStatePayload`) all declare 
`rendered_map_index: str | None = None`.
   
   3. **API endpoint therefore cannot persist it** even if the SDK were to send 
it.
   
   ### Suggested fix
   
   1. In `TIRescheduleStatePayload`, add `rendered_map_index: str | None = 
None`, matching the other payloads.
   2. In the `except AirflowRescheduleException` handler of 
`task_runner.run()`, call `_render_map_index(...)` and pass it on 
`RescheduleTask`.
   3. In the execution-api endpoint that handles the `up_for_reschedule` 
transition, persist `rendered_map_index` to `TaskInstance.rendered_map_index` 
(same pattern as the success path).
   
   ### Versions tested
   
   - apache-airflow-core: 3.0.1, 3.1.0
   - apache-airflow-task-sdk: 1.0.6, 1.1.0, 1.2.0, 1.2.1
   - Python: 3.12
   
   
   ### What you think should happen instead?
   
   The rendered_map_index should be filled as in airflow2
   
   ### Operating System
   
   _No response_
   
   ### Deployment
   
   None
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to