zacyu opened a new issue, #49376:
URL: https://github.com/apache/airflow/issues/49376

   ### Apache Airflow version
   
   2.10.5
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When executing dynamically mapped task groups (using `expand_kwargs` 
specifically) in a DAG, if some arguments with default values are not 
explicitly specified, the task would fail with the following KeyError:
   > {abstractoperator.py:783} ERROR - Exception rendering Jinja template for 
task '<task_group_name>.<task_name>', field 'op_kwargs'. Template: {...
   
   ### What you think should happen instead?
   
   The task should pick up the default values defined for the task group and 
execute successfully.
   
   ### How to reproduce
   
   Reproduce with the following DAG:
   
   ```python
   @task
   def _multi_param_task(
       *,
       str_param_1: str,
       str_param_2_with_default: str = "default",
       int_param_3: int,
   ) -> dict[str, str]:
       return {
           "param_1": str_param_1,
           "param_2": str_param_2_with_default,
           "param_3": int_param_3,
       }
   
   @task_group
   def _multi_param_task_group(
       *,
       str_param_1: str,
       str_param_2_with_default: str = "default",
       int_param_3: int,
   ) -> dict[str, str]:
       return _multi_param_task(
           str_param_1=str_param_1,
           str_param_2_with_default=str_param_2_with_default,
           int_param_3=int_param_3,
       )
   
   
   @dag(
       start_date=datetime(2025, 1, 1),
       schedule_interval=None,
   )
   def dynamic_task_dag() -> None:
       _multi_param_task.partial(
           str_param_1="task group dynamically-mapped with expand_kwargs",
       ).expand_kwargs(
           [
               {"int_param_3": 412},
               {"int_param_3": 345},
           ]
       )
   
       _multi_param_task_group.partial(
           str_param_1="task group dynamically-mapped with expand_kwargs",
       ).expand_kwargs(
           [
               {"int_param_3": 621},
               {"int_param_3": 926},
           ]
       )
   ```
   
   The dynamically-mapped task can be executed successfully but the task group 
fails with the aforementioned error.
   
   <img width="999" alt="error on _multi_param_task [] map index 0" 
src="https://github.com/user-attachments/assets/d9dd8d47-f9f7-46a6-9d21-2918c3eaef6b";
 />
   
   ```
   [2025-04-16, 16:18:59 EDT] {abstractoperator.py:783} ERROR - Exception 
rendering Jinja template for task '_multi_param_task_group._multi_param_task', 
field 'op_kwargs'. Template: {'str_param_1': 'task group dynamic invocation via 
expand_kwargs', 'str_param_2_with_default': 
MappedArgument(_input=ListOfDictsExpandInput(value=[{'int_param_3': 621}, 
{'int_param_3': 926}]), _key='str_param_2_with_default'), 'int_param_3': 
MappedArgument(_input=ListOfDictsExpandInput(value=[{'int_param_3': 621}, 
{'int_param_3': 926}]), _key='int_param_3')}
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/abstractoperator.py",
 line 775, in _do_render_template_fields
       rendered_content = self.render_template(
                          ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py",
 line 185, in render_template
       return {k: self.render_template(v, context, jinja_env, oids) for k, v in 
value.items()}
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py",
 line 175, in render_template
       return value.resolve(context, include_xcom=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/expandinput.py",
 line 74, in resolve
       return data[self._key]
              ~~~~^^^^^^^^^^^
   KeyError: 'str_param_2_with_default'
   [2025-04-16, 16:18:59 EDT] {taskinstance.py:3313} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 274, in _run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 3117, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 3540, in render_templates
       original_task.render_template_fields(context, jinja_env)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 1442, in render_template_fields
       self._do_render_template_fields(self, self.template_fields, context, 
jinja_env, set())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/abstractoperator.py",
 line 775, in _do_render_template_fields
       rendered_content = self.render_template(
                          ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py",
 line 185, in render_template
       return {k: self.render_template(v, context, jinja_env, oids) for k, v in 
value.items()}
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/template/templater.py",
 line 175, in render_template
       return value.resolve(context, include_xcom=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/expandinput.py",
 line 74, in resolve
       return data[self._key]
              ~~~~^^^^^^^^^^^
   KeyError: 'str_param_2_with_default'
   ```
   
   ### Operating System
   
   Amazon Linux 2 (ECS host)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to