GitHub user dschneider-wxs created a discussion: taskgroup mapping doesn't subset list
I'm am trying to implement a "depth-first" mapping as discussed https://github.com/apache/airflow/issues/25032#issuecomment-1684926654 and https://github.com/apache/airflow/issues/40543 I modified the example to include a task that generates the list I want to map over. I realize that traditional operators don't have all the features of taskflow api so I included the `render` task. When I run this, i get 3 ti of r, e, t, l each. `render` correctly outputs 1 of the dicts in each mapped task instance but e, t, and l print the entire list of files. I was expecting the only the output from each r mapped task instance to be passed to the following e, t and l. Can you clarify if this approach is possible? How else could I achieve the end result of running each item from get_files() through a sequence of processing tasks? Thanks ``` from airflow.providers.standard.operators.bash import BashOperator from airflow.sdk import dag, task, task_group from pendulum import datetime @dag(start_date=datetime(2022, 12, 1), schedule=None, catchup=False) def task_group_mapping_example(): @task(multiple_outputs=False) def get_files(): return [ dict(letter="a", number=1), dict(letter="b", number=2), dict(letter="c", number=3), ] @task_group(group_id="etl") def etl_pipeline(file): @task(multiple_outputs=True) def render(file): return file r = render(file) e = BashOperator(task_id="e", bash_command=f"""echo {r}""") t = BashOperator(task_id="t", bash_command=f"""echo {r["letter"]}""") l = BashOperator(task_id="l", bash_command=f"""echo {r["number"]}""") r >> e >> t >> l etl = etl_pipeline.expand(file=get_files()) etl task_group_mapping_example() ``` GitHub link: https://github.com/apache/airflow/discussions/56466 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
