This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 583266c3d29 Fix clearTaskInstances API: Restore include_past/future 
support on UI (#54416) (#55272)
583266c3d29 is described below

commit 583266c3d29312b85c53621a7cda2b5b49a36e3a
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Sep 5 12:03:34 2025 +0200

    Fix clearTaskInstances API: Restore include_past/future support on UI 
(#54416) (#55272)
    
    Co-authored-by: Gwak Beomgyu <[email protected]>
---
 .../core_api/routes/public/task_instances.py       |  4 +-
 .../core_api/routes/public/test_task_instances.py  | 72 +++++++++++++++++++++-
 2 files changed, 71 insertions(+), 5 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 715c15ed4e1..f794697123e 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -663,10 +663,10 @@ def post_clear_task_instances(
             error_message = f"Dag Run id {dag_run_id} not found in dag 
{dag_id}"
             raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
 
-        if past or future:
+        if (past or future) and dag_run.logical_date is None:
             raise HTTPException(
                 status.HTTP_400_BAD_REQUEST,
-                "Cannot use include_past or include_future when dag_run_id is 
provided because logical_date is not applicable.",
+                "Cannot use include_past or include_future with no 
logical_date(e.g. manually or asset-triggered).",
             )
         body.start_date = dag_run.logical_date if dag_run.logical_date is not 
None else None
         body.end_date = dag_run.logical_date if dag_run.logical_date is not 
None else None
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 52f83780449..f668c08d242 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -2175,7 +2175,7 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             check_last_log(session, dag_id=request_dag, 
event="post_clear_task_instances", logical_date=None)
 
     @pytest.mark.parametrize("flag", ["include_future", "include_past"])
-    def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, 
session, flag):
+    def test_manual_run_with_none_logical_date_returns_400(self, test_client, 
session, flag):
         dag_id = "example_python_operator"
         payload = {
             "dry_run": True,
@@ -2183,7 +2183,7 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
             "only_failed": True,
             flag: True,
         }
-        task_instances = [{"logical_date": DEFAULT_DATETIME_1, "state": 
State.FAILED}]
+        task_instances = [{"logical_date": None, "state": State.FAILED}]
         self.create_task_instances(
             session,
             dag_id=dag_id,
@@ -2195,10 +2195,76 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
         response = test_client.post(f"/dags/{dag_id}/clearTaskInstances", 
json=payload)
         assert response.status_code == 400
         assert (
-            "Cannot use include_past or include_future when dag_run_id is 
provided"
+            "Cannot use include_past or include_future with no 
logical_date(e.g. manually or asset-triggered)."
             in response.json()["detail"]
         )
 
+    @pytest.mark.parametrize(
+        "flag, expected",
+        [
+            ("include_past", 2),  # T0 ~ T1
+            ("include_future", 2),  # T1 ~ T2
+        ],
+    )
+    def test_with_dag_run_id_and_past_future_converts_to_date_range(
+        self, test_client, session, flag, expected
+    ):
+        dag_id = "example_python_operator"
+        task_instances = [
+            {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED},  # T0
+            {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), 
"state": State.FAILED},  # T1
+            {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), 
"state": State.FAILED},  # T2
+        ]
+        self.create_task_instances(session, dag_id=dag_id, 
task_instances=task_instances, update_extras=False)
+        payload = {
+            "dry_run": True,
+            "only_failed": True,
+            "dag_run_id": "TEST_DAG_RUN_ID_1",
+            flag: True,
+        }
+        resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", 
json=payload)
+        assert resp.status_code == 200
+        assert resp.json()["total_entries"] == expected  # include_past => 
T0,T1 / include_future => T1,T2
+
+    def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, 
test_client, session):
+        dag_id = "example_python_operator"
+        task_instances = [
+            {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1), 
"state": State.FAILED},  # T0
+            {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED},  # T1
+            {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), 
"state": State.FAILED},  # T2
+            {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), 
"state": State.FAILED},  # T3
+            {"logical_date": None, "state": State.FAILED},  # T4
+        ]
+        self.create_task_instances(session, dag_id=dag_id, 
task_instances=task_instances, update_extras=False)
+        payload = {
+            "dry_run": True,
+            "only_failed": False,
+            "dag_run_id": "TEST_DAG_RUN_ID_1",  # T1
+            "include_past": True,
+            "include_future": True,
+        }
+        resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", 
json=payload)
+        assert resp.status_code == 200
+        assert resp.json()["total_entries"] == 5  # T0 ~ #T4
+
+    def test_with_dag_run_id_only_uses_run_id_based_clearing(self, 
test_client, session):
+        dag_id = "example_python_operator"
+        task_instances = [
+            {"logical_date": DEFAULT_DATETIME_1, "state": State.SUCCESS},  # T0
+            {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), 
"state": State.FAILED},  # T1
+            {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), 
"state": State.SUCCESS},  # T2
+        ]
+        self.create_task_instances(session, dag_id=dag_id, 
task_instances=task_instances, update_extras=False)
+        payload = {
+            "dry_run": True,
+            "only_failed": True,
+            "dag_run_id": "TEST_DAG_RUN_ID_1",
+        }
+        resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", 
json=payload)
+        assert resp.status_code == 200
+        assert resp.json()["total_entries"] == 1
+        assert resp.json()["task_instances"][0]["logical_date"] == 
"2020-01-02T00:00:00Z"  # T1
+
     def test_should_respond_401(self, unauthenticated_test_client):
         response = unauthenticated_test_client.post(
             "/dags/dag_id/clearTaskInstances",

Reply via email to