ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3017666287
@sjyangkevin Thank you for your reply. We have attempted to use the simplest
possible DAG code to reproduce the issue. Below, I will provide my DAG code.
`
from __future__ import annotations
import datetime
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="example_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Shanghai"),
catchup=False,
#dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
params={"example_key": "example_value"},
) as dag:
run_this_last = EmptyOperator(
task_id="run_this_last",
queue='worker_03',
)
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.apache.org/",
queue='worker_03',
)
run_this >> run_this_last
for i in range(202):
task = BashOperator(
task_id="runme_{i}".format(i=i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 180',
queue='worker_03',
)
task >> run_this
task_72000 = BashOperator(
task_id="runme_{i}".format(i='xx'),
bash_command='echo "{{ task_instance_key_str }}" && sleep 72000',
queue='worker_03',
)
task_72000 >> run_this
task_12000_1 = BashOperator(
task_id="runme_{i}".format(i='rzy'),
bash_command='echo "{{ task_instance_key_str }}" && sleep 12000',
queue='worker_02',
)
task_12000_1 >> run_this
task_36000 = BashOperator(
task_id="runme_{i}".format(i='rzy_1'),
bash_command='echo "{{ task_instance_key_str }}" && sleep 36000',
queue='worker_03',
)
task_36000 >> run_this
for i in range(5):
task = BashOperator(
task_id="runme_rzy_{i}".format(i=i+2),
bash_command='echo "{{ task_instance_key_str }}" && sleep 72000',
queue='worker_03',
)
task >> run_this
also_run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ task_instance_key_str }}" && sleep 600
',
queue='worker_03',
)
also_run_this >> run_this_last
this_will_skip = BashOperator(
task_id="this_will_skip",
bash_command='echo "hello world"; exit 99;',
queue='worker_03',
dag=dag,
)
this_will_skip >> run_this_last
`
The following error message can be seen.
[2025-06-27T21:00:44.048+0800] {dagrun.py:1271} INFO - Restoring task
'<TaskInstance: example_bash_operator1.runme_201
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed
from DAG '<DAG: example_bash_operator1>'
[2025-06-27T21:00:44.050+0800] {dagrun.py:1271} INFO - Restoring task
'<TaskInstance: example_bash_operator1.runme_rzy_2
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed
from DAG '<DAG: example_bash_operator1>'
[2025-06-27T21:00:44.050+0800] {dagrun.py:1271} INFO - Restoring task
'<TaskInstance: example_bash_operator1.runme_rzy_3
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed
from DAG '<DAG: example_bash_operator1>'
[2025-06-27T21:00:44.051+0800] {dagrun.py:1271} INFO - Restoring task
'<TaskInstance: example_bash_operator1.runme_rzy_4
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed
from DAG '<DAG: example_bash_operator1>'
[2025-06-27T21:00:44.051+0800] {dagrun.py:1271} INFO - Restoring task
'<TaskInstance: example_bash_operator1.runme_rzy_5
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed
from DAG '<DAG: example_bash_operator1>'
[2025-06-27T21:00:44.052+0800] {dagrun.py:1271} INFO - Restoring task
'<TaskInstance: example_bash_operator1.runme_rzy_6
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed
from DAG '<DAG: example_bash_operator1>'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]