This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b017ae38235 fix dagrun list limit (#64071)
b017ae38235 is described below
commit b017ae38235cbfb0f3855f03cf087cba3eaa7b8d
Author: Henry Chen <[email protected]>
AuthorDate: Wed Apr 1 20:03:01 2026 +0800
fix dagrun list limit (#64071)
* fix dagrun list limit
* add unit test for limit parameter
---
airflow-ctl/src/airflowctl/api/operations.py | 21 +++---
.../tests/airflow_ctl/api/test_operations.py | 81 +++++++++++++---------
2 files changed, 61 insertions(+), 41 deletions(-)
diff --git a/airflow-ctl/src/airflowctl/api/operations.py
b/airflow-ctl/src/airflowctl/api/operations.py
index 64424eff9c5..9c492022221 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -608,32 +608,33 @@ class DagRunOperations(BaseOperations):
dag_id: str | None = None,
) -> DAGRunCollectionResponse | ServerResponseError:
"""
- List all dag runs.
+ List dag runs (at most `limit` results).
Args:
state: Filter dag runs by state
start_date: Filter dag runs by start date (optional)
end_date: Filter dag runs by end date (optional)
- state: Filter dag runs by state
- limit: Limit the number of results
+ limit: Limit the number of results returned
dag_id: The DAG ID to filter by. If None, retrieves dag runs for
all DAGs (using "~").
"""
# Use "~" for all DAGs if dag_id is not specified
if not dag_id:
dag_id = "~"
- params: dict[str, object] = {
- "state": state,
+ params: dict[str, Any] = {
+ "state": str(state),
"limit": limit,
}
if start_date is not None:
- params["start_date"] = start_date
+ params["start_date"] = start_date.isoformat()
if end_date is not None:
- params["end_date"] = end_date
+ params["end_date"] = end_date.isoformat()
- return super().execute_list(
- path=f"/dags/{dag_id}/dagRuns",
data_model=DAGRunCollectionResponse, params=params
- )
+ try:
+ self.response = self.client.get(f"/dags/{dag_id}/dagRuns",
params=params)
+ return
DAGRunCollectionResponse.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
class JobsOperations(BaseOperations):
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index f75a3c9678f..82544098632 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -1064,44 +1064,63 @@ class TestDagRunOperations:
)
assert response == self.dag_run_collection_response
- def test_list_all_dags(self):
- """Test listing dag runs for all DAGs using default dag_id='~'."""
-
- def handle_request(request: httpx.Request) -> httpx.Response:
- # When dag_id is "~", it should query all DAGs
- assert request.url.path == "/api/v2/dags/~/dagRuns"
+ @pytest.mark.parametrize(
+ (
+ "dag_id_input",
+ "state",
+ "limit",
+ "start_date",
+ "end_date",
+ "expected_path_suffix",
+ "expected_params_subset",
+ ),
+ [
+ # Test --limit with various values and configurations (covers CLI
--limit flag)
+ ("dag1", "queued", 5, None, None, "dag1", {"state": "queued",
"limit": "5"}),
+ (None, "running", 1, None, None, "~", {"state": "running",
"limit": "1"}),
+ (
+ "example_dag",
+ "success",
+ 10,
+ None,
+ None,
+ "example_dag",
+ {"state": "success", "limit": "10"},
+ ),
+ ("dag2", "failed", 0, None, None, "dag2", {"state": "failed",
"limit": "0"}),
+ ],
+ ids=["limit-5", "all-dags-limit-1", "string-state-limit-10",
"limit-zero"],
+ )
+ def test_list_with_various_limits(
+ self,
+ dag_id_input: str | None,
+ state: str | DagRunState,
+ limit: int,
+ start_date: datetime.datetime | None,
+ end_date: datetime.datetime | None,
+ expected_path_suffix: str,
+ expected_params_subset: dict,
+ ) -> None:
+ """Test listing dag runs with various limit values (especially --limit
flag)."""
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ assert
request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns")
+ params = dict(request.url.params)
+ for key, value in expected_params_subset.items():
+ assert key in params
+ assert str(params[key]) == str(value)
return httpx.Response(200,
json=json.loads(self.dag_run_collection_response.model_dump_json()))
client = make_api_client(transport=httpx.MockTransport(handle_request))
- # Call without specifying dag_id - should use default "~"
response = client.dag_runs.list(
- start_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
- end_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
- state="running",
- limit=1,
+ state=state,
+ limit=limit,
+ start_date=start_date,
+ end_date=end_date,
+ dag_id=dag_id_input,
)
assert response == self.dag_run_collection_response
- def test_list_with_optional_parameters(self):
- """Test listing dag runs with only some optional parameters."""
-
- def handle_request(request: httpx.Request) -> httpx.Response:
- assert request.url.path == "/api/v2/dags/dag1/dagRuns"
- # Verify that only state and limit are in query params
- params = dict(request.url.params)
- assert "state" in params
- assert params["state"] == "queued"
- assert "limit" in params
- assert params["limit"] == "5"
- # start_date and end_date should not be present
- assert "start_date" not in params
- assert "end_date" not in params
- return httpx.Response(200,
json=json.loads(self.dag_run_collection_response.model_dump_json()))
-
- client = make_api_client(transport=httpx.MockTransport(handle_request))
- response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1")
- assert response == self.dag_run_collection_response
-
class TestJobsOperations:
job_response = JobResponse(