jason810496 commented on code in PR #56351:
URL: https://github.com/apache/airflow/pull/56351#discussion_r2425277111


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -95,6 +95,7 @@
 from airflow.utils.span_status import SpanStatus
 from airflow.utils.sqlalchemy import ExecutorConfigType, ExtendedJSON, 
UtcDateTime
 from airflow.utils.state import DagRunState, State, TaskInstanceState
+from typing import Optional

Review Comment:
   ```suggestion
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -195,6 +196,7 @@ def clear_task_instances(
     session: Session,
     dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
     run_on_latest_version: bool = False,
+    prevent_running_task: Optional[bool] = None,

Review Comment:
   ```suggestion
       prevent_running_task: bool | None = None,
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -215,15 +217,32 @@ def clear_task_instances(
     """
     task_instance_ids: list[str] = []
     from airflow.models.dagbag import DBDagBag
+    from airflow.exceptions import AirflowClearRunningTaskException
 
     scheduler_dagbag = DBDagBag(load_op_links=False)
     for ti in tis:
         task_instance_ids.append(ti.id)
         ti.prepare_db_for_next_try(session)
-        if ti.state == TaskInstanceState.RUNNING:
-            # If a task is cleared when running, set its state to RESTARTING 
so that
-            # the task is terminated and becomes eligible for retry.
-            ti.state = TaskInstanceState.RESTARTING
+
+        #Task instance state checks:
+        is_running = ti.state == TaskInstanceState.RUNNING
+        is_queued = ti.state == TaskInstanceState.QUEUED
+        is_scheduled = ti.state == TaskInstanceState.SCHEDULED

Review Comment:
   ```suggestion
           is_transitional = ti.state in {
               TaskInstanceState.RUNNING,
               TaskInstanceState.QUEUED,
               TaskInstanceState.SCHEDULED,
           }
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -215,15 +217,32 @@ def clear_task_instances(
     """
     task_instance_ids: list[str] = []
     from airflow.models.dagbag import DBDagBag
+    from airflow.exceptions import AirflowClearRunningTaskException
 
     scheduler_dagbag = DBDagBag(load_op_links=False)
     for ti in tis:
         task_instance_ids.append(ti.id)
         ti.prepare_db_for_next_try(session)
-        if ti.state == TaskInstanceState.RUNNING:
-            # If a task is cleared when running, set its state to RESTARTING 
so that
-            # the task is terminated and becomes eligible for retry.
-            ti.state = TaskInstanceState.RESTARTING
+
+        #Task instance state checks:
+        is_running = ti.state == TaskInstanceState.RUNNING
+        is_queued = ti.state == TaskInstanceState.QUEUED
+        is_scheduled = ti.state == TaskInstanceState.SCHEDULED
+
+        if is_running or is_queued or is_scheduled:
+            if prevent_running_task and is_running:
+                raise 
AirflowClearRunningTaskException("AirflowClearRunningTaskException_RUNNING: 
Task is running, stopping attempt to clear.")
+                # Prevents the task from re-running and clearing when 
prevent_running_task and is_running is True.
+
+            elif prevent_running_task and (is_queued or is_scheduled):
+                raise 
AirflowClearRunningTaskException("AirflowClearRunningTaskException_QUEUED: Task 
is about to run or is scheduled to run, stopping attempt to clear.")
+                # Prevents the task from re-running and clearing when 
prevent_running_task is True and sends a warning toast.

Review Comment:
   ```suggestion
           if is_transitional:
               raise AirflowClearRunningTaskException("Disable 
'prevent_running_task' to proceed, or wait until the task is not running, 
queued, or scheduled state.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to