jason810496 commented on code in PR #55660:
URL: https://github.com/apache/airflow/pull/55660#discussion_r2362153068


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -215,12 +215,22 @@ def clear_task_instances(
     """
     task_instance_ids: list[str] = []
     from airflow.models.dagbag import DBDagBag
+    from fastapi import HTTPException
 
     scheduler_dagbag = DBDagBag(load_op_links=False)
+    isRunning = False
     for ti in tis:
         task_instance_ids.append(ti.id)
         ti.prepare_db_for_next_try(session)
+        if hasattr(ti, 'is_running_message') and ti.is_running_message == True:
+            isRunning = True

Review Comment:
   Naming is really difficult, but it seems there will be better naming for the 
`isRunning` logic here.
   Since the purpose of `is_running_message` is more like 
`confirm_clear_running` or `allow_clear_running_task` ... etc.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -770,6 +770,9 @@ def post_clear_task_instances(
         )
 
     if not dry_run:
+        if hasattr(body, 'is_running_message'):

Review Comment:
   The `body` is already a Pydantic schema. Would it be better to access the 
attribute directly?
   ```suggestion
           if body.is_running_message:
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -215,12 +215,22 @@ def clear_task_instances(
     """
     task_instance_ids: list[str] = []
     from airflow.models.dagbag import DBDagBag
+    from fastapi import HTTPException
 
     scheduler_dagbag = DBDagBag(load_op_links=False)
+    isRunning = False
     for ti in tis:
         task_instance_ids.append(ti.id)
         ti.prepare_db_for_next_try(session)
+        if hasattr(ti, 'is_running_message') and ti.is_running_message == True:
+            isRunning = True
+
         if ti.state == TaskInstanceState.RUNNING:
+            if isRunning == True:
+                raise HTTPException(
+                    status_code=400,
+                    detail=f"Task {ti.task_id} is running, restarting"
+                )

Review Comment:
   We should not raise FastAPI `HTTPException` in the model level, because it 
will couple FastAPI with Airflow Core.
   It would be better to raise `ValueError` in model, and the router can catch 
`ValueError` and raise it as `HTTPException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to