asherkhb-ktx opened a new issue, #35355:
URL: https://github.com/apache/airflow/issues/35355

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   When attempting to use dynamic task mapping over a task_group() based on a 
non-standard XCom (e.g. NOT return_value), the group expands to 
n=len(return_value) instances instead of n=len(specified_key). 
   
   Dynamic instances which correspond to a valid index in the specified_key 
succeed, but...
   * If len(return_value) < len(specified_key) not all expected tasks are run; 
or...
   * if len(return_value) > len(specified_key), the extra tasks fail with 
"IndexError: list index out of range"
   
   ### What you think should happen instead
   
   The task_group should expand over the length of the specified XCom value
   
   ### How to reproduce
   
   Consider the following DAG; you would expect `greet_by_name` to run three 
times but it instead only runs twice. 
   
   ```python
   from airflow import DAG
   from airflow.decorators import task, task_group
   import pendulum
   
   with DAG(
       'Test_Dynamic',
       start_date=pendulum.datetime(2022, 1, 1, tz='UTC'),
       schedule_interval=None,
       catchup=False,
   ) as dag:
       
       @task(multiple_outputs=True)
       def get_greetings():
           return {
               'salutation': 'Hello',
               'names': ['John', 'Jane', 'Jack'],
           }
       
       greetings = get_greetings()
   
       @task_group()
       def greet_by_name(name, salutation):
   
           @task()
           def format_greeting(name, salutation):
               return f'{salutation}, {name}'
   
           @task()
           def print_message(msg):
               print(msg)
           
           print_message(format_greeting(name, salutation))
   
   
       greet_everyone = greet_by_name.partial(
           salutation=greetings['salutation']
       ).expand(
           name=greetings['names']
       )
   
   ```
   
   If additional outputs are added to the `get_greetings` task then the 
task_group will expand accordingly (e.g. to 4 tasks) but some will fail:
   
   ```python
   @task(multiple_outputs=True)
       def get_greetings():
           return {
               'salutation': 'Hello',
               'names': ['John', 'Jane', 'Jack'],
               'extra_key_1': 'extra_value_1',
               'extra_key_2': 'extra_value_2',
           }
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to