AngeloPingoGalp commented on issue #27785:
URL: https://github.com/apache/airflow/issues/27785#issuecomment-1320848344
Hi Niko,
I changed the code, basically, some simplifications and I changed the
schedule to every 5 minutes and worked.
Code:
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = "test_xcom_issue"
SUB_DAG_NAME = "subdag_writer"
SUB_DAG_FULL_NAME = f"{DAG_NAME}.{SUB_DAG_NAME}"
SUB_DAG_PUSH_TASK = "push_xcom_value_sub_dag"
def sub_dag() -> DAG:
with DAG(
dag_id=SUB_DAG_FULL_NAME,
start_date=datetime(2022, 11, 19),
catchup=False,
schedule="* * * * *",
) as subdag:
def extract(**kwargs):
print("hello Im running")
return "xcom_value_test"
PythonOperator(
task_id=SUB_DAG_PUSH_TASK,
python_callable=extract,
dag=subdag,
do_xcom_push=True,
)
return subdag
with DAG(
dag_id=DAG_NAME,
start_date=datetime(2022, 11, 19),
catchup=False,
schedule="*/5 * * * *"
) as dag:
def read_xcom(ti, **kwargs):
return_value = ti.xcom_pull(
task_ids=SUB_DAG_PUSH_TASK,
dag_id=SUB_DAG_FULL_NAME
)
print(f"I got this from xcom: {return_value}")
bash_push_sub_dag = SubDagOperator(
task_id=SUB_DAG_NAME,
subdag=sub_dag(),
dag=dag,
)
read_xcom_value = PythonOperator(
task_id="read_xcom_value",
python_callable=read_xcom,
)
bash_push_sub_dag >> read_xcom_value
Thank you for your help and clarification. However, we have some use cases
where we use the scheduler as None and the pipeline is triggered externally by
REST API interface. There is some workaround to this issue for this use case?
Best regards,
Angelo Pingo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]