EssKayz commented on issue #26619: URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256048784
This should be a working example, so essentially ``` dag = DAG('mockup_example', # With this option, even when the start date is "one day ago" we instruct airflow scheduler to only create # a DAG Run for the most current instance of the DAG interval series. catchup=False, default_args=dag_args, default_view='graph', schedule_interval=None, dagrun_timeout=timedelta(minutes=30)) def gen_multiprocess_example( dag: DAG, gen_task_id, **kwargs ) -> TaskGroup: with TaskGroup(dag=dag, group_id=f'mockup_{gen_task_id}', prefix_group_id=False) as grp: task = PythonOperator( task_id=f'split_{gen_task_id}_data', python_callable=split_example, op_kwargs={ } ) # This works, by utilizing a dirty hack PythonOperator.partial( task_id=f'process_{gen_task_id}_data', python_callable=process_split_data, op_kwargs={ } ).expand( templates_dict=XComArg(task) ) # This doesn't work, since the kwargs get expanded to PythonOperator, not to the callable. # PythonOperator.partial( # task_id=f'process_{gen_task_id}_data', # python_callable=process_split_data, # op_kwargs={ # } # ).expand_kwargs( # XComArg(task) # ) return grp def split_example(**kwargs): return [{'data': '1234', 'bucket_name': 'prod'}, {'data': '4321', 'bucket_name': 'test'}] def process_split_data(data=None, bucket_name=None, templates_dict={}, **kwargs): print('-' * 20) print('data in process_func:', data) print('bucket_name in process_func:', bucket_name) print('templates_dict in process_func:', templates_dict) print('-' * 20) print(get_data(**templates_dict, **kwargs)) def get_data(data=None, bucket_name=None, **kwargs): print('-' * 20) print('data in get_data:', data) print('bucket_name in get_data:', bucket_name) print('-' * 20) return data with dag: tg1 = gen_multiprocess_example( dag, gen_task_id='hope_this_helps' ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org