jason810496 commented on code in PR #56351:
URL: https://github.com/apache/airflow/pull/56351#discussion_r2425277111
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -95,6 +95,7 @@
from airflow.utils.span_status import SpanStatus
from airflow.utils.sqlalchemy import ExecutorConfigType, ExtendedJSON,
UtcDateTime
from airflow.utils.state import DagRunState, State, TaskInstanceState
+from typing import Optional
Review Comment:
```suggestion
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -195,6 +196,7 @@ def clear_task_instances(
session: Session,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
run_on_latest_version: bool = False,
+ prevent_running_task: Optional[bool] = None,
Review Comment:
```suggestion
prevent_running_task: bool | None = None,
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -215,15 +217,32 @@ def clear_task_instances(
"""
task_instance_ids: list[str] = []
from airflow.models.dagbag import DBDagBag
+ from airflow.exceptions import AirflowClearRunningTaskException
scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
- if ti.state == TaskInstanceState.RUNNING:
- # If a task is cleared when running, set its state to RESTARTING
so that
- # the task is terminated and becomes eligible for retry.
- ti.state = TaskInstanceState.RESTARTING
+
+ #Task instance state checks:
+ is_running = ti.state == TaskInstanceState.RUNNING
+ is_queued = ti.state == TaskInstanceState.QUEUED
+ is_scheduled = ti.state == TaskInstanceState.SCHEDULED
Review Comment:
```suggestion
is_transitional = ti.state in {
TaskInstanceState.RUNNING,
TaskInstanceState.QUEUED,
TaskInstanceState.SCHEDULED,
}
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -215,15 +217,32 @@ def clear_task_instances(
"""
task_instance_ids: list[str] = []
from airflow.models.dagbag import DBDagBag
+ from airflow.exceptions import AirflowClearRunningTaskException
scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
- if ti.state == TaskInstanceState.RUNNING:
- # If a task is cleared when running, set its state to RESTARTING
so that
- # the task is terminated and becomes eligible for retry.
- ti.state = TaskInstanceState.RESTARTING
+
+ #Task instance state checks:
+ is_running = ti.state == TaskInstanceState.RUNNING
+ is_queued = ti.state == TaskInstanceState.QUEUED
+ is_scheduled = ti.state == TaskInstanceState.SCHEDULED
+
+ if is_running or is_queued or is_scheduled:
+ if prevent_running_task and is_running:
+ raise
AirflowClearRunningTaskException("AirflowClearRunningTaskException_RUNNING:
Task is running, stopping attempt to clear.")
+ # Prevents the task from re-running and clearing when
prevent_running_task and is_running is True.
+
+ elif prevent_running_task and (is_queued or is_scheduled):
+ raise
AirflowClearRunningTaskException("AirflowClearRunningTaskException_QUEUED: Task
is about to run or is scheduled to run, stopping attempt to clear.")
+ # Prevents the task from re-running and clearing when
prevent_running_task is True and sends a warning toast.
Review Comment:
```suggestion
if is_transitional:
raise AirflowClearRunningTaskException("Disable
'prevent_running_task' to proceed, or wait until the task is not running,
queued, or scheduled state.")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]