EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256048784

   This should be a working example, so essentially 
   ```
   dag = DAG('mockup_example',
             # With this option, even when the start date is "one day ago" we 
instruct airflow scheduler to only create
             # a DAG Run for the most current instance of the DAG interval 
series.
             catchup=False,
             default_args=dag_args,
             default_view='graph',
             schedule_interval=None, dagrun_timeout=timedelta(minutes=30))
   
   
   def gen_multiprocess_example(
       dag: DAG,
       gen_task_id,
       **kwargs
   ) -> TaskGroup:
   
       with TaskGroup(dag=dag, group_id=f'mockup_{gen_task_id}', 
prefix_group_id=False) as grp:
           task = PythonOperator(
               task_id=f'split_{gen_task_id}_data',
               python_callable=split_example,
               op_kwargs={
   
               }
           )
   
           # This works, by utilizing a dirty hack
           PythonOperator.partial(
               task_id=f'process_{gen_task_id}_data',
               python_callable=process_split_data,
               op_kwargs={
   
               }
           ).expand(
               templates_dict=XComArg(task)
           )
   
           # This doesn't work, since the kwargs get expanded to 
PythonOperator, not to the callable.
           # PythonOperator.partial(
           #     task_id=f'process_{gen_task_id}_data',
           #     python_callable=process_split_data,
           #     op_kwargs={
   
           #     }
           # ).expand_kwargs(
           #     XComArg(task)
           # ) 
   
       return grp
   
   
   def split_example(**kwargs):
       return [{'data': '1234', 'bucket_name': 'prod'}, {'data': '4321', 
'bucket_name': 'test'}]
   
   
   def process_split_data(data=None, bucket_name=None, templates_dict={}, 
**kwargs):
       print('-' * 20)
       print('data in process_func:', data)
       print('bucket_name in process_func:', bucket_name)
       print('templates_dict in process_func:', templates_dict)
       print('-' * 20)
       print(get_data(**templates_dict, **kwargs))
   
   
   def get_data(data=None, bucket_name=None, **kwargs):
       print('-' * 20)
       print('data in get_data:', data)
       print('bucket_name in get_data:', bucket_name)
       print('-' * 20)
       return data
   
   
   with dag:
       tg1 = gen_multiprocess_example(
           dag,
           gen_task_id='hope_this_helps'
       )
       ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to