o-nikolas commented on issue #27785:
URL: https://github.com/apache/airflow/issues/27785#issuecomment-1320787388

   #### tl;dr: this actually is working, see my simplified example below. BUT 
there is an edge case where this _doesn't_ work for manual triggers of the 
parent dag.
   
   If you load the attached dag below into your airflow environment (simplified 
from what you provided) it will run every 5 minutes and the XCOM value from the 
task in the subdag will be correctly read back from the task in the parent dag. 
This works because `xcom_pull` now adds `run_id` to the XCOM query filter (see 
this PR: #19825). And when the parent dag is _scheduled_ both the parent and 
subdag have the same dag run id (something like `scheduled__<datetime>`).
   BUT, if you _manually_ trigger the parent dag its run id is now 
`manual__<datetime>` while the subdag is still `scheduled__<datetime>` which 
means the filtering on `run_id` excludes all xcom values from the child dag 
since it has a different dag run id. This is a legitimate regression that I 
think we should fix (@potiuk do you agree?).
   
   I think there may have been some bugs with your dag code,  but even if there 
wasn't, I assume you were manually triggering the parent dag, so the xcom could 
not be read successfully for the above reason.
     
   ```python
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.subdag import SubDagOperator
   
   DAG_NAME = "test_xcom"
   SUB_DAG_NAME = "subdag_writer"
   SUB_DAG_PUSH_TASK = "push_xcom_value_sub_dag"
   
   
   def sub_dag() -> DAG:
       with DAG(
           dag_id=f"{DAG_NAME}.{SUB_DAG_NAME}",
           start_date=datetime(2022, 1, 1),
           catchup=False,
           schedule="* * * * *",
       ) as subdag:
   
           def extract(**kwargs):
               print("hello Im running")
               return "xcom_value_test"
   
           PythonOperator(
               task_id=SUB_DAG_PUSH_TASK,
               python_callable=extract,
               dag=subdag,
               do_xcom_push=True,
           )
   
           return subdag
   
   
   with DAG(
       dag_id=DAG_NAME,
       start_date=datetime(2022, 1, 1),
       catchup=False,
       schedule="@once",
       # schedule="*/5 * * * *",
   ) as dag:
   
       def push_xcom(**kwargs):
           return "test_value"
   
       def read_xcom(ti, **kwargs):
           return_value = ti.xcom_pull(
               task_ids="push_xcom_value_sub_dag",
               dag_id="test_xcom.subdag_writer", key="return_value",
           )
           print(f"I got this from xcom: {return_value}")
   
       push_xcom_value = PythonOperator(
           task_id="push_xcom_value",
           python_callable=push_xcom,
       )
   
       subdag = sub_dag()
   
       bash_push_sub_dag = SubDagOperator(
           task_id=SUB_DAG_NAME,
           subdag=subdag,
           dag=dag,
       )
   
       read_xcom_value = PythonOperator(
           task_id="read_xcom_value",
           python_callable=read_xcom,
       )
   
       push_xcom_value >> bash_push_sub_dag >> read_xcom_value
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to