ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3017651039
@sjyangkevin Thank you for your reply. The current net DAG format we use is
relatively complex. Therefore, we used a simpler DAG example to test and
reproduce this issue. Below, I will provide my DAG code.
`# -*- coding:utf-8 -*-
from __future__ import annotations
import datetime
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="example_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Shanghai"),
catchup=False,
#dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
params={"example_key": "example_value"},
) as dag:
run_this_last = EmptyOperator(
task_id="run_this_last",
queue='worker_03',
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.apache.org/",
queue='worker_03',
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(202):
task = BashOperator(
task_id="runme_{i}".format(i=i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 180',
queue='worker_03',
)
task >> run_this
task_72000 = BashOperator(
task_id="runme_{i}".format(i='xx'),
bash_command='echo "{{ task_instance_key_str }}" && sleep 72000',
queue='worker_03',
)
task_72000 >> run_this
task_12000_1 = BashOperator(
task_id="runme_{i}".format(i='rzy'),
bash_command='echo "{{ task_instance_key_str }}" && sleep 12000',
queue='worker_02',
)
task_12000_1 >> run_this
task_36000 = BashOperator(
task_id="runme_{i}".format(i='rzy_1'),
bash_command='echo "{{ task_instance_key_str }}" && sleep 36000',
queue='worker_03',
)
task_36000 >> run_this
for i in range(5):
task = BashOperator(
task_id="runme_rzy_{i}".format(i=i+2),
bash_command='echo "{{ task_instance_key_str }}" && sleep 72000',
queue='worker_03',
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ task_instance_key_str }}" && sleep 600
',
queue='worker_03',
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
task_id="this_will_skip",
bash_command='echo "hello world"; exit 99;',
queue='worker_03',
dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last
`
The following error message can be seen.
<img width="878" alt="Image"
src="https://github.com/user-attachments/assets/34594f1d-24dd-4fc9-8963-8488ed7a68a7"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]