This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 583266c3d29 Fix clearTaskInstances API: Restore include_past/future
support on UI (#54416) (#55272)
583266c3d29 is described below
commit 583266c3d29312b85c53621a7cda2b5b49a36e3a
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Sep 5 12:03:34 2025 +0200
Fix clearTaskInstances API: Restore include_past/future support on UI
(#54416) (#55272)
Co-authored-by: Gwak Beomgyu <[email protected]>
---
.../core_api/routes/public/task_instances.py | 4 +-
.../core_api/routes/public/test_task_instances.py | 72 +++++++++++++++++++++-
2 files changed, 71 insertions(+), 5 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 715c15ed4e1..f794697123e 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -663,10 +663,10 @@ def post_clear_task_instances(
error_message = f"Dag Run id {dag_run_id} not found in dag
{dag_id}"
raise HTTPException(status.HTTP_404_NOT_FOUND, error_message)
- if past or future:
+ if (past or future) and dag_run.logical_date is None:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
- "Cannot use include_past or include_future when dag_run_id is
provided because logical_date is not applicable.",
+ "Cannot use include_past or include_future with no
logical_date(e.g. manually or asset-triggered).",
)
body.start_date = dag_run.logical_date if dag_run.logical_date is not
None else None
body.end_date = dag_run.logical_date if dag_run.logical_date is not
None else None
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 52f83780449..f668c08d242 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -2175,7 +2175,7 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
check_last_log(session, dag_id=request_dag,
event="post_clear_task_instances", logical_date=None)
@pytest.mark.parametrize("flag", ["include_future", "include_past"])
- def test_dag_run_with_future_or_past_flag_returns_400(self, test_client,
session, flag):
+ def test_manual_run_with_none_logical_date_returns_400(self, test_client,
session, flag):
dag_id = "example_python_operator"
payload = {
"dry_run": True,
@@ -2183,7 +2183,7 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
"only_failed": True,
flag: True,
}
- task_instances = [{"logical_date": DEFAULT_DATETIME_1, "state":
State.FAILED}]
+ task_instances = [{"logical_date": None, "state": State.FAILED}]
self.create_task_instances(
session,
dag_id=dag_id,
@@ -2195,10 +2195,76 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
response = test_client.post(f"/dags/{dag_id}/clearTaskInstances",
json=payload)
assert response.status_code == 400
assert (
- "Cannot use include_past or include_future when dag_run_id is
provided"
+ "Cannot use include_past or include_future with no
logical_date(e.g. manually or asset-triggered)."
in response.json()["detail"]
)
+ @pytest.mark.parametrize(
+ "flag, expected",
+ [
+ ("include_past", 2), # T0 ~ T1
+ ("include_future", 2), # T1 ~ T2
+ ],
+ )
+ def test_with_dag_run_id_and_past_future_converts_to_date_range(
+ self, test_client, session, flag, expected
+ ):
+ dag_id = "example_python_operator"
+ task_instances = [
+ {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # T0
+ {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1),
"state": State.FAILED}, # T1
+ {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2),
"state": State.FAILED}, # T2
+ ]
+ self.create_task_instances(session, dag_id=dag_id,
task_instances=task_instances, update_extras=False)
+ payload = {
+ "dry_run": True,
+ "only_failed": True,
+ "dag_run_id": "TEST_DAG_RUN_ID_1",
+ flag: True,
+ }
+ resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances",
json=payload)
+ assert resp.status_code == 200
+ assert resp.json()["total_entries"] == expected # include_past =>
T0,T1 / include_future => T1,T2
+
+ def test_with_dag_run_id_and_both_past_and_future_means_full_range(self,
test_client, session):
+ dag_id = "example_python_operator"
+ task_instances = [
+ {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1),
"state": State.FAILED}, # T0
+ {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # T1
+ {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1),
"state": State.FAILED}, # T2
+ {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2),
"state": State.FAILED}, # T3
+ {"logical_date": None, "state": State.FAILED}, # T4
+ ]
+ self.create_task_instances(session, dag_id=dag_id,
task_instances=task_instances, update_extras=False)
+ payload = {
+ "dry_run": True,
+ "only_failed": False,
+ "dag_run_id": "TEST_DAG_RUN_ID_1", # T1
+ "include_past": True,
+ "include_future": True,
+ }
+ resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances",
json=payload)
+ assert resp.status_code == 200
+ assert resp.json()["total_entries"] == 5 # T0 ~ #T4
+
+ def test_with_dag_run_id_only_uses_run_id_based_clearing(self,
test_client, session):
+ dag_id = "example_python_operator"
+ task_instances = [
+ {"logical_date": DEFAULT_DATETIME_1, "state": State.SUCCESS}, # T0
+ {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1),
"state": State.FAILED}, # T1
+ {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2),
"state": State.SUCCESS}, # T2
+ ]
+ self.create_task_instances(session, dag_id=dag_id,
task_instances=task_instances, update_extras=False)
+ payload = {
+ "dry_run": True,
+ "only_failed": True,
+ "dag_run_id": "TEST_DAG_RUN_ID_1",
+ }
+ resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances",
json=payload)
+ assert resp.status_code == 200
+ assert resp.json()["total_entries"] == 1
+ assert resp.json()["task_instances"][0]["logical_date"] ==
"2020-01-02T00:00:00Z" # T1
+
def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.post(
"/dags/dag_id/clearTaskInstances",