sc-anssi opened a new issue, #35361: URL: https://github.com/apache/airflow/issues/35361
### Apache Airflow version 2.7.2 ### What happened Trying to map over a dict instead of a list results in a KeyError (the following backtrace is issued when running the DAG below with python 3.11) : ``` [2023-11-01T23:37:35.428+0100] {dag.py:3966} INFO - dagrun id: test [2023-11-01T23:37:35.438+0100] {dag.py:3982} INFO - created dagrun <DagRun test @ 2023-11-01T22:37:35.362926+00:00: manual__2023-11-01T22:37:35.362926+00:00, state:running, queued_at: None. externally triggered: False> [2023-11-01T23:37:35.442+0100] {dag.py:3930} INFO - ***************************************************** [2023-11-01T23:37:35.443+0100] {dag.py:3934} INFO - Running task get_dict [2023-11-01 23:37:35,509] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='get_dict' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T22:37:35.362926+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T22:37:35.362926+00:00' [2023-11-01T23:37:35.509+0100] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='get_dict' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T22:37:35.362926+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T22:37:35.362926+00:00' [2023-11-01 23:37:35,511] {python.py:194} INFO - Done. Returned value was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')} [2023-11-01T23:37:35.511+0100] {python.py:194} INFO - Done. Returned value was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')} [2023-11-01 23:37:35,533] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T223735, start_date=, end_date=20231101T223735 [2023-11-01T23:37:35.533+0100] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T223735, start_date=, end_date=20231101T223735 [2023-11-01T23:37:35.552+0100] {dag.py:3938} INFO - get_dict ran successfully! [2023-11-01T23:37:35.553+0100] {dag.py:3941} INFO - ***************************************************** [2023-11-01T23:37:35.572+0100] {dag.py:3930} INFO - ***************************************************** [2023-11-01T23:37:35.572+0100] {dag.py:3934} INFO - Running task print_msg [2023-11-01 23:37:35,595] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task self._execute_task_with_callbacks(context, test_mode, session=session) File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1647, in _execute_task_with_callbacks task_orig = self.render_templates(context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2285, in render_templates original_task.render_template_fields(context) File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 725, in render_template_fields mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/decorators/base.py", line 528, in _expand_mapped_kwargs op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 584, in _expand_mapped_kwargs return self._get_specified_expand_input().resolve(context, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py", line 200, in resolve data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()} ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py", line 200, in <dictcomp> data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()} ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py", line 184, in _expand_mapped_field return value[found_index] ~~~~~^^^^^^^^^^^^^ File "/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/xcom_arg.py", line 455, in __getitem__ value = self.value[index] ~~~~~~~~~~^^^^^^^ KeyError: 0 [...] ``` ### What you think should happen instead The same way it is possible to expand over dict data, it should be possible to map over dict data as well. I believe mapped tasks are passed a tuple `(k, v)` as argument when using `expand()` over a dict. It seems reasonable to expect the same from the `map()` method. According to the [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#transforming-expanding-data) the data should be iterable (and dicts are) and according to the [code](https://github.com/apache/airflow/blob/2.7.2/airflow/models/xcom_arg.py#L450) dict is a valid type for `_MappedResult` value. ### How to reproduce ```python from datetime import datetime from airflow import DAG from airflow.decorators import task def dummy(arg): return arg with DAG( "test", start_date=datetime(2021, 5, 1), catchup=False, ) as dag: @task def get_dict(): return { "key1": ("item1", "item2"), "key2": ("item3", "item4"), } @task def get_list(): return ["item1", "item2"] @task def print_msg(arg): print(arg) # Works with list #print_msg.expand(arg=get_list().map(dummy)) # Does not work with dict print_msg.expand(arg=get_dict().map(dummy)) if __name__ == "__main__": dag.test() ### Operating System Ubuntu 23.04 ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else It is possible to workaround the issue with the following (ugly) patch that will probably break something else somewhere...: ```diff diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py index a19aa6703f..c87d522771 100644 --- a/airflow/models/xcom_arg.py +++ b/airflow/models/xcom_arg.py @@ -452,7 +452,10 @@ class _MapResult(Sequence): self.callables = callables def __getitem__(self, index: Any) -> Any: - value = self.value[index] + if isinstance(self.value, dict): + value = list(self.value.items())[index] + else: + value = self.value[index] # In the worker, we can access all actual callables. Call them. callables = [f for f in self.callables if callable(f)] ``` Running the same DAG with this patch gives the following trace: ``` [2023-11-02T00:14:34.731+0100] {dag.py:3966} INFO - dagrun id: test [2023-11-02T00:14:34.740+0100] {dag.py:3982} INFO - created dagrun <DagRun test @ 2023-11-01T23:14:34.665846+00:00: manual__2023-11-01T23:14:34.665846+00:00, state:running, queued_at: None. externally triggered: False> [2023-11-02T00:14:34.744+0100] {dag.py:3930} INFO - ***************************************************** [2023-11-02T00:14:34.744+0100] {dag.py:3934} INFO - Running task get_dict [2023-11-02 00:14:34,799] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='get_dict' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00' [2023-11-02T00:14:34.799+0100] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='get_dict' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00' [2023-11-02 00:14:34,801] {python.py:194} INFO - Done. Returned value was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')} [2023-11-02T00:14:34.801+0100] {python.py:194} INFO - Done. Returned value was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')} [2023-11-02 00:14:34,820] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T231434, start_date=, end_date=20231101T231434 [2023-11-02T00:14:34.820+0100] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T231434, start_date=, end_date=20231101T231434 [2023-11-02T00:14:34.837+0100] {dag.py:3938} INFO - get_dict ran successfully! [2023-11-02T00:14:34.838+0100] {dag.py:3941} INFO - ***************************************************** [2023-11-02T00:14:34.852+0100] {dag.py:3930} INFO - ***************************************************** [2023-11-02T00:14:34.852+0100] {dag.py:3934} INFO - Running task print_msg [2023-11-02 00:14:34,895] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='print_msg' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00' [2023-11-02T00:14:34.895+0100] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='print_msg' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00' ('key1', ['item1', 'item2']) [2023-11-02 00:14:34,896] {python.py:194} INFO - Done. Returned value was: None [2023-11-02T00:14:34.896+0100] {python.py:194} INFO - Done. Returned value was: None [2023-11-02 00:14:34,897] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=print_msg, map_index=0, execution_date=20231101T231434, start_date=, end_date=20231101T231434 [2023-11-02T00:14:34.897+0100] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=print_msg, map_index=0, execution_date=20231101T231434, start_date=, end_date=20231101T231434 [2023-11-02T00:14:34.912+0100] {dag.py:3938} INFO - print_msg ran successfully! [2023-11-02T00:14:34.913+0100] {dag.py:3941} INFO - ***************************************************** [2023-11-02T00:14:34.913+0100] {dag.py:3930} INFO - ***************************************************** [2023-11-02T00:14:34.913+0100] {dag.py:3932} INFO - Running task print_msg index 1 [2023-11-02 00:14:34,956] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='print_msg' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00' [2023-11-02T00:14:34.956+0100] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' AIRFLOW_CTX_TASK_ID='print_msg' AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00' ('key2', ['item3', 'item4']) [2023-11-02 00:14:34,956] {python.py:194} INFO - Done. Returned value was: None [2023-11-02T00:14:34.956+0100] {python.py:194} INFO - Done. Returned value was: None [2023-11-02 00:14:34,957] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=print_msg, map_index=1, execution_date=20231101T231434, start_date=, end_date=20231101T231434 [2023-11-02T00:14:34.957+0100] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test, task_id=print_msg, map_index=1, execution_date=20231101T231434, start_date=, end_date=20231101T231434 [2023-11-02T00:14:34.971+0100] {dag.py:3938} INFO - print_msg ran successfully! [2023-11-02T00:14:34.971+0100] {dag.py:3941} INFO - ***************************************************** [2023-11-02T00:14:34.974+0100] {dagrun.py:653} INFO - Marking run <DagRun test @ 2023-11-01T23:14:34.665846+00:00: manual__2023-11-01T23:14:34.665846+00:00, state:running, queued_at: None. externally triggered: False> successful [2023-11-02T00:14:34.974+0100] {dagrun.py:704} INFO - DagRun Finished: dag_id=test, execution_date=2023-11-01T23:14:34.665846+00:00, run_id=manual__2023-11-01T23:14:34.665846+00:00, run_start_date=2023-11-01 23:14:34.665846+00:00, run_end_date=2023-11-01 23:14:34.974797+00:00, run_duration=0.308951, state=success, external_trigger=False, run_type=manual, data_interval_start=2023-10-31T23:14:34.665846+00:00, data_interval_end=2023-11-01T23:14:34.665846+00:00, dag_hash=None ``` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org