rawwar commented on code in PR #43506:
URL: https://github.com/apache/airflow/pull/43506#discussion_r1847117275
##########
tests/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -156,6 +164,275 @@ def test_get_dag_run_not_found(self, test_client):
assert body["detail"] == "The DagRun with dag_id: `test_dag1` and
run_id: `invalid` was not found"
+class TestGetDagRuns:
+ @staticmethod
+ def parse_datetime(datetime_str):
+ return datetime_str.isoformat().replace("+00:00", "Z") if datetime_str
else None
+
+ @staticmethod
+ def get_dag_run_dict(run: DagRun):
+ return {
+ "run_id": run.run_id,
+ "dag_id": run.dag_id,
+ "logical_date": TestGetDagRuns.parse_datetime(run.logical_date),
+ "queued_at": TestGetDagRuns.parse_datetime(run.queued_at),
+ "start_date": TestGetDagRuns.parse_datetime(run.start_date),
+ "end_date": TestGetDagRuns.parse_datetime(run.end_date),
+ "data_interval_start":
TestGetDagRuns.parse_datetime(run.data_interval_start),
+ "data_interval_end":
TestGetDagRuns.parse_datetime(run.data_interval_end),
+ "last_scheduling_decision":
TestGetDagRuns.parse_datetime(run.last_scheduling_decision),
+ "run_type": run.run_type,
+ "state": run.state,
+ "external_trigger": run.external_trigger,
+ "triggered_by": run.triggered_by.value,
+ "conf": run.conf,
+ "note": run.note,
+ }
+
+ @pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID,
2), ("~", 4)])
+ def test_get_dag_runs(self, test_client, session, dag_id, total_entries):
+ response = test_client.get(f"/public/dags/{dag_id}/dagRuns")
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == total_entries
+ for each in body["dag_runs"]:
+ run = (
+ session.query(DagRun)
+ .where(DagRun.dag_id == each["dag_id"], DagRun.run_id ==
each["run_id"])
+ .one()
+ )
+ expected = self.get_dag_run_dict(run)
+ assert each == expected
+
+ def test_get_dag_runs_not_found(self, test_client):
+ response = test_client.get("/public/dags/invalid/dagRuns")
+ assert response.status_code == 404
+ body = response.json()
+ assert body["detail"] == "The DAG with dag_id: `invalid` was not found"
+
+ def test_invalid_order_by_raises_400(self, test_client):
+ response =
test_client.get("/public/dags/test_dag1/dagRuns?order_by=invalid")
+ assert response.status_code == 400
+ body = response.json()
+ assert (
+ body["detail"]
+ == "Ordering with 'invalid' is disallowed or the attribute does
not exist on the model"
+ )
+
+ @pytest.mark.parametrize(
+ "order_by, expected_dag_id_order",
+ [
+ ("id", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("state", [DAG1_RUN2_ID, DAG1_RUN1_ID]),
+ ("dag_id", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("logical_date", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("dag_run_id", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("start_date", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("end_date", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("updated_at", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("external_trigger", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ("conf", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ],
+ )
+ def test_return_correct_results_with_order_by(self, test_client, order_by,
expected_dag_id_order):
+ response = test_client.get("/public/dags/test_dag1/dagRuns",
params={"order_by": order_by})
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 2
+ assert [each["run_id"] for each in body["dag_runs"]] ==
expected_dag_id_order
+
+ @pytest.mark.parametrize(
+ "query_params, expected_dag_id_order",
+ [
+ ({}, [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ({"limit": 1}, [DAG1_RUN1_ID]),
+ ({"limit": 3}, [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ ({"offset": 1}, [DAG1_RUN2_ID]),
+ ({"offset": 2}, []),
+ ({"limit": 1, "offset": 1}, [DAG1_RUN2_ID]),
+ ({"limit": 1, "offset": 2}, []),
+ ],
+ )
+ def test_limit_and_offset(self, test_client, query_params,
expected_dag_id_order):
+ response = test_client.get("/public/dags/test_dag1/dagRuns",
params=query_params)
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 2
+ assert [each["run_id"] for each in body["dag_runs"]] ==
expected_dag_id_order
+
+ @pytest.mark.parametrize(
+ "query_params, expected_detail",
+ [
+ (
+ {"limit": 1, "offset": -1},
+ [
+ {
+ "type": "greater_than_equal",
+ "loc": ["query", "offset"],
+ "msg": "Input should be greater than or equal to 0",
+ "input": "-1",
+ "ctx": {"ge": 0},
+ }
+ ],
+ ),
+ (
+ {"limit": -1, "offset": 1},
+ [
+ {
+ "type": "greater_than_equal",
+ "loc": ["query", "limit"],
+ "msg": "Input should be greater than or equal to 0",
+ "input": "-1",
+ "ctx": {"ge": 0},
+ }
+ ],
+ ),
+ (
+ {"limit": -1, "offset": -1},
+ [
+ {
+ "type": "greater_than_equal",
+ "loc": ["query", "limit"],
+ "msg": "Input should be greater than or equal to 0",
+ "input": "-1",
+ "ctx": {"ge": 0},
+ },
+ {
+ "type": "greater_than_equal",
+ "loc": ["query", "offset"],
+ "msg": "Input should be greater than or equal to 0",
+ "input": "-1",
+ "ctx": {"ge": 0},
+ },
+ ],
+ ),
+ ],
+ )
+ def test_bad_limit_and_offset(self, test_client, query_params,
expected_detail):
+ response = test_client.get("/public/dags/test_dag1/dagRuns",
params=query_params)
+ assert response.status_code == 422
+ assert response.json()["detail"] == expected_detail
+
+ @pytest.mark.parametrize(
+ "dag_id, query_params, expected_dag_id_list",
+ [
+ (DAG1_ID, {"logical_date_gte": LOGICAL_DATE1.isoformat()},
[DAG1_RUN1_ID, DAG1_RUN2_ID]),
+ (DAG2_ID, {"logical_date_lte": LOGICAL_DATE3.isoformat()},
[DAG2_RUN1_ID]),
+ (
+ "~",
+ {
+ "start_date_gte": START_DATE1.isoformat(),
+ "start_date_lte": (START_DATE2 -
timedelta(days=1)).isoformat(),
+ },
+ [DAG1_RUN1_ID, DAG1_RUN2_ID],
+ ),
+ (
+ DAG1_ID,
+ {
+ "end_date_gte": START_DATE2.isoformat(),
+ "end_date_lte": (datetime.now(tz=timezone.utc) +
timedelta(days=1)).isoformat(),
+ },
+ [DAG1_RUN1_ID, DAG1_RUN2_ID],
+ ),
+ (
+ DAG1_ID,
+ {
+ "logical_date_gte": LOGICAL_DATE1.isoformat(),
+ "logical_date_lte": LOGICAL_DATE2.isoformat(),
+ },
+ [DAG1_RUN1_ID, DAG1_RUN2_ID],
+ ),
+ (
+ DAG2_ID,
+ {
+ "start_date_gte": START_DATE2.isoformat(),
+ "end_date_lte": (datetime.now(tz=timezone.utc) +
timedelta(days=1)).isoformat(),
+ },
+ [DAG2_RUN1_ID, DAG2_RUN2_ID],
+ ),
+ (DAG1_ID, {"state": DagRunState.SUCCESS.value}, [DAG1_RUN1_ID]),
+ (DAG2_ID, {"state": DagRunState.FAILED.value}, []),
+ (
+ DAG1_ID,
+ {"state": DagRunState.SUCCESS.value, "logical_date_gte":
LOGICAL_DATE1.isoformat()},
+ [DAG1_RUN1_ID],
+ ),
+ (
+ DAG1_ID,
+ {"state": DagRunState.FAILED.value, "start_date_gte":
START_DATE1.isoformat()},
+ [DAG1_RUN2_ID],
+ ),
+ ],
+ )
+ def test_filters(self, test_client, dag_id, query_params,
expected_dag_id_list):
+ response = test_client.get(f"/public/dags/{dag_id}/dagRuns",
params=query_params)
+ assert response.status_code == 200
+ body = response.json()
+ assert [each["run_id"] for each in body["dag_runs"]] ==
expected_dag_id_list
+
+ def test_bad_filters(self, test_client):
+ query_params = {
+ "logical_date_gte": "invalid",
+ "start_date_gte": "invalid",
+ "end_date_gte": "invalid",
+ "logical_date_lte": "invalid",
+ "start_date_lte": "invalid",
+ "end_date_lte": "invalid",
+ }
+ expected_detail = [
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "logical_date_gte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "logical_date_lte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "start_date_gte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "start_date_lte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "end_date_gte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "end_date_lte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ ]
+ response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns",
params=query_params)
+ assert response.status_code == 422
+ body = response.json()
+ assert body["detail"] == expected_detail
+
+ def test_invalid_state(self, test_client):
+ with pytest.raises(ValueError, match="'invalid' is not a valid
DagRunState"):
Review Comment:
I checked this for List TI endpoint and its same. It raises a ValueError.
So, I kept it the same. But, wondering if this should raise 422
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]