ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3017666287

   @sjyangkevin Thank you for your reply. We have attempted to use the simplest 
possible DAG code to reproduce the issue. Below, I will provide my DAG code.
   `
   from __future__ import annotations
   
   import datetime
   
   import pendulum
   
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.operators.empty import EmptyOperator
   
   with DAG(
       dag_id="example_bash_operator",
       schedule="0 0 * * *",
       start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Shanghai"),
       catchup=False,
       #dagrun_timeout=datetime.timedelta(minutes=60),
       tags=["example", "example2"],
       params={"example_key": "example_value"},
   ) as dag:
       run_this_last = EmptyOperator(
           task_id="run_this_last",
           queue='worker_03',
       )
   
    
       run_this = BashOperator(
           task_id="run_after_loop",
           bash_command="echo https://airflow.apache.org/";,
           queue='worker_03',
       )
       
   
       run_this >> run_this_last
   
       for i in range(202):
           task = BashOperator(
               task_id="runme_{i}".format(i=i),
               bash_command='echo "{{ task_instance_key_str }}" && sleep 180',
               queue='worker_03',
           )
           task >> run_this
       task_72000 = BashOperator(
           task_id="runme_{i}".format(i='xx'),
           bash_command='echo "{{ task_instance_key_str }}" && sleep 72000',
           queue='worker_03',
       )
       task_72000 >> run_this
   
       task_12000_1 = BashOperator(
           task_id="runme_{i}".format(i='rzy'),
           bash_command='echo "{{ task_instance_key_str }}" && sleep 12000',
           queue='worker_02',
       )
       task_12000_1 >> run_this
   
       task_36000 = BashOperator(
           task_id="runme_{i}".format(i='rzy_1'),
           bash_command='echo "{{ task_instance_key_str }}" && sleep 36000',
           queue='worker_03',
       )
       task_36000 >> run_this
   
       for i in range(5):
           task = BashOperator(
               task_id="runme_rzy_{i}".format(i=i+2),
               bash_command='echo "{{ task_instance_key_str }}" && sleep 72000',
               queue='worker_03',
           )
           task >> run_this
   
       also_run_this = BashOperator(
           task_id="also_run_this",
           bash_command='echo "ti_key={{ task_instance_key_str }}" && sleep 600 
',
           queue='worker_03',
       )
    
       also_run_this >> run_this_last
   
   
   this_will_skip = BashOperator(
       task_id="this_will_skip",
       bash_command='echo "hello world"; exit 99;',
       queue='worker_03',
       dag=dag,
   )
   
   this_will_skip >> run_this_last
   `
   The following error message can be seen.
   [2025-06-27T21:00:44.048+0800] {dagrun.py:1271} INFO - Restoring task 
'<TaskInstance: example_bash_operator1.runme_201 
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed 
from DAG '<DAG: example_bash_operator1>'
   [2025-06-27T21:00:44.050+0800] {dagrun.py:1271} INFO - Restoring task 
'<TaskInstance: example_bash_operator1.runme_rzy_2 
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed 
from DAG '<DAG: example_bash_operator1>'
   [2025-06-27T21:00:44.050+0800] {dagrun.py:1271} INFO - Restoring task 
'<TaskInstance: example_bash_operator1.runme_rzy_3 
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed 
from DAG '<DAG: example_bash_operator1>'
   [2025-06-27T21:00:44.051+0800] {dagrun.py:1271} INFO - Restoring task 
'<TaskInstance: example_bash_operator1.runme_rzy_4 
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed 
from DAG '<DAG: example_bash_operator1>'
   [2025-06-27T21:00:44.051+0800] {dagrun.py:1271} INFO - Restoring task 
'<TaskInstance: example_bash_operator1.runme_rzy_5 
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed 
from DAG '<DAG: example_bash_operator1>'
   [2025-06-27T21:00:44.052+0800] {dagrun.py:1271} INFO - Restoring task 
'<TaskInstance: example_bash_operator1.runme_rzy_6 
scheduled__2025-06-25T16:00:00+00:00 [removed]>' which was previously removed 
from DAG '<DAG: example_bash_operator1>'
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to