sc-anssi opened a new issue, #35361:
URL: https://github.com/apache/airflow/issues/35361

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   Trying to map over a dict instead of a list results in a KeyError (the 
following backtrace is issued when running the DAG below with python 3.11) :
   ```
   [2023-11-01T23:37:35.428+0100] {dag.py:3966} INFO - dagrun id: test
   [2023-11-01T23:37:35.438+0100] {dag.py:3982} INFO - created dagrun <DagRun 
test @ 2023-11-01T22:37:35.362926+00:00: 
manual__2023-11-01T22:37:35.362926+00:00, state:running, queued_at: None. 
externally triggered: False>
   [2023-11-01T23:37:35.442+0100] {dag.py:3930} INFO - 
*****************************************************
   [2023-11-01T23:37:35.443+0100] {dag.py:3934} INFO - Running task get_dict
   [2023-11-01 23:37:35,509] {taskinstance.py:1662} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='get_dict' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T22:37:35.362926+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T22:37:35.362926+00:00'
   [2023-11-01T23:37:35.509+0100] {taskinstance.py:1662} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='get_dict' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T22:37:35.362926+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T22:37:35.362926+00:00'
   [2023-11-01 23:37:35,511] {python.py:194} INFO - Done. Returned value was: 
{'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
   [2023-11-01T23:37:35.511+0100] {python.py:194} INFO - Done. Returned value 
was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
   [2023-11-01 23:37:35,533] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T223735, 
start_date=, end_date=20231101T223735
   [2023-11-01T23:37:35.533+0100] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T223735, 
start_date=, end_date=20231101T223735
   [2023-11-01T23:37:35.552+0100] {dag.py:3938} INFO - get_dict ran 
successfully!
   [2023-11-01T23:37:35.553+0100] {dag.py:3941} INFO - 
*****************************************************
   [2023-11-01T23:37:35.572+0100] {dag.py:3930} INFO - 
*****************************************************
   [2023-11-01T23:37:35.572+0100] {dag.py:3934} INFO - Running task print_msg
   [2023-11-01 23:37:35,595] {taskinstance.py:1937} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 1518, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode, session=session)
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 1647, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 2285, in render_templates
       original_task.render_template_fields(context)
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/mappedoperator.py",
 line 725, in render_template_fields
       mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/decorators/base.py",
 line 528, in _expand_mapped_kwargs
       op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context, 
session)
                                  
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/mappedoperator.py",
 line 584, in _expand_mapped_kwargs
       return self._get_specified_expand_input().resolve(context, session)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py",
 line 200, in resolve
       data = {k: self._expand_mapped_field(k, v, context, session=session) for 
k, v in self.value.items()}
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py",
 line 200, in <dictcomp>
       data = {k: self._expand_mapped_field(k, v, context, session=session) for 
k, v in self.value.items()}
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py",
 line 184, in _expand_mapped_field
       return value[found_index]
              ~~~~~^^^^^^^^^^^^^
     File 
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/xcom_arg.py",
 line 455, in __getitem__
       value = self.value[index]
               ~~~~~~~~~~^^^^^^^
   KeyError: 0
   [...]
   ```
   
   
   ### What you think should happen instead
   
   The same way it is possible to expand over dict data, it should be possible 
to map over dict data as well. I believe mapped tasks are passed a tuple `(k, 
v)` as argument when using `expand()` over a dict. It seems reasonable to 
expect the same from the `map()` method.
   
   According to the 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#transforming-expanding-data)
 the data should be iterable (and dicts are) and according to the 
[code](https://github.com/apache/airflow/blob/2.7.2/airflow/models/xcom_arg.py#L450)
 dict is a valid type for `_MappedResult` value.
   
   
   
   ### How to reproduce
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   def dummy(arg):
       return arg
   
   
   with DAG(
       "test",
       start_date=datetime(2021, 5, 1),
       catchup=False,
   ) as dag:
   
       @task
       def get_dict():
           return {
               "key1": ("item1", "item2"),
               "key2": ("item3", "item4"),
           }
   
       @task
       def get_list():
           return ["item1", "item2"]
   
       @task
       def print_msg(arg):
           print(arg)
   
       # Works with list
       #print_msg.expand(arg=get_list().map(dummy))
   
       # Does not work with dict
       print_msg.expand(arg=get_dict().map(dummy))
   
   if __name__ == "__main__":
       dag.test()
   
   
   ### Operating System
   
   Ubuntu 23.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   It is possible to workaround the issue with the following (ugly) patch that 
will probably break something else somewhere...:
   ```diff
   diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
   index a19aa6703f..c87d522771 100644
   --- a/airflow/models/xcom_arg.py
   +++ b/airflow/models/xcom_arg.py
   @@ -452,7 +452,10 @@ class _MapResult(Sequence):
            self.callables = callables
    
        def __getitem__(self, index: Any) -> Any:
   -        value = self.value[index]
   +        if isinstance(self.value, dict):
   +            value = list(self.value.items())[index]
   +        else:
   +            value = self.value[index]
    
            # In the worker, we can access all actual callables. Call them.
            callables = [f for f in self.callables if callable(f)]
   ```
   
   Running the same DAG with this patch gives the following trace:
   ```
   [2023-11-02T00:14:34.731+0100] {dag.py:3966} INFO - dagrun id: test
   [2023-11-02T00:14:34.740+0100] {dag.py:3982} INFO - created dagrun <DagRun 
test @ 2023-11-01T23:14:34.665846+00:00: 
manual__2023-11-01T23:14:34.665846+00:00, state:running, queued_at: None. 
externally triggered: False>
   [2023-11-02T00:14:34.744+0100] {dag.py:3930} INFO - 
*****************************************************
   [2023-11-02T00:14:34.744+0100] {dag.py:3934} INFO - Running task get_dict
   [2023-11-02 00:14:34,799] {taskinstance.py:1662} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='get_dict' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
   [2023-11-02T00:14:34.799+0100] {taskinstance.py:1662} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='get_dict' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
   [2023-11-02 00:14:34,801] {python.py:194} INFO - Done. Returned value was: 
{'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
   [2023-11-02T00:14:34.801+0100] {python.py:194} INFO - Done. Returned value 
was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
   [2023-11-02 00:14:34,820] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T231434, 
start_date=, end_date=20231101T231434
   [2023-11-02T00:14:34.820+0100] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T231434, 
start_date=, end_date=20231101T231434
   [2023-11-02T00:14:34.837+0100] {dag.py:3938} INFO - get_dict ran 
successfully!
   [2023-11-02T00:14:34.838+0100] {dag.py:3941} INFO - 
*****************************************************
   [2023-11-02T00:14:34.852+0100] {dag.py:3930} INFO - 
*****************************************************
   [2023-11-02T00:14:34.852+0100] {dag.py:3934} INFO - Running task print_msg
   [2023-11-02 00:14:34,895] {taskinstance.py:1662} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='print_msg' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
   [2023-11-02T00:14:34.895+0100] {taskinstance.py:1662} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='print_msg' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
   ('key1', ['item1', 'item2'])
   [2023-11-02 00:14:34,896] {python.py:194} INFO - Done. Returned value was: 
None
   [2023-11-02T00:14:34.896+0100] {python.py:194} INFO - Done. Returned value 
was: None
   [2023-11-02 00:14:34,897] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=print_msg, map_index=0, 
execution_date=20231101T231434, start_date=, end_date=20231101T231434
   [2023-11-02T00:14:34.897+0100] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=print_msg, map_index=0, 
execution_date=20231101T231434, start_date=, end_date=20231101T231434
   [2023-11-02T00:14:34.912+0100] {dag.py:3938} INFO - print_msg ran 
successfully!
   [2023-11-02T00:14:34.913+0100] {dag.py:3941} INFO - 
*****************************************************
   [2023-11-02T00:14:34.913+0100] {dag.py:3930} INFO - 
*****************************************************
   [2023-11-02T00:14:34.913+0100] {dag.py:3932} INFO - Running task print_msg 
index 1
   [2023-11-02 00:14:34,956] {taskinstance.py:1662} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='print_msg' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
   [2023-11-02T00:14:34.956+0100] {taskinstance.py:1662} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test' 
AIRFLOW_CTX_TASK_ID='print_msg' 
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
   ('key2', ['item3', 'item4'])
   [2023-11-02 00:14:34,956] {python.py:194} INFO - Done. Returned value was: 
None
   [2023-11-02T00:14:34.956+0100] {python.py:194} INFO - Done. Returned value 
was: None
   [2023-11-02 00:14:34,957] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=print_msg, map_index=1, 
execution_date=20231101T231434, start_date=, end_date=20231101T231434
   [2023-11-02T00:14:34.957+0100] {taskinstance.py:1400} INFO - Marking task as 
SUCCESS. dag_id=test, task_id=print_msg, map_index=1, 
execution_date=20231101T231434, start_date=, end_date=20231101T231434
   [2023-11-02T00:14:34.971+0100] {dag.py:3938} INFO - print_msg ran 
successfully!
   [2023-11-02T00:14:34.971+0100] {dag.py:3941} INFO - 
*****************************************************
   [2023-11-02T00:14:34.974+0100] {dagrun.py:653} INFO - Marking run <DagRun 
test @ 2023-11-01T23:14:34.665846+00:00: 
manual__2023-11-01T23:14:34.665846+00:00, state:running, queued_at: None. 
externally triggered: False> successful
   [2023-11-02T00:14:34.974+0100] {dagrun.py:704} INFO - DagRun Finished: 
dag_id=test, execution_date=2023-11-01T23:14:34.665846+00:00, 
run_id=manual__2023-11-01T23:14:34.665846+00:00, run_start_date=2023-11-01 
23:14:34.665846+00:00, run_end_date=2023-11-01 23:14:34.974797+00:00, 
run_duration=0.308951, state=success, external_trigger=False, run_type=manual, 
data_interval_start=2023-10-31T23:14:34.665846+00:00, 
data_interval_end=2023-11-01T23:14:34.665846+00:00, dag_hash=None
   ```
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to