potiuk commented on code in PR #66469: URL: https://github.com/apache/airflow/pull/66469#discussion_r3215351516
########## airflow-ctl/src/airflowctl/ctl/cli_config.py: ########## @@ -52,6 +53,31 @@ BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ +def _is_list_annotation(annotation: Any) -> bool: + """ + Check whether a Pydantic field annotation is a list type. + + Handles ``Annotated[list[...] | None, ...]`` and similar wrapped forms + that ``typing.get_origin`` alone cannot detect. + """ + origin = typing.get_origin(annotation) + + # Direct list[...] + if origin is list: + return True + + # Unwrap Annotated[X, ...] + if origin is typing.Annotated: + inner = typing.get_args(annotation)[0] + return _is_list_annotation(inner) + + # Unwrap Union / X | None + if origin is typing.Union: + return any(_is_list_annotation(arg) for arg in typing.get_args(annotation) if arg is not type(None)) Review Comment: **Minor — possibly dead branch.** Per [Python typing docs](https://docs.python.org/3/library/typing.html#typing.get_origin), `typing.get_origin(Annotated[X, ...])` returns `get_origin(X)` (i.e. the `Annotated` marker is transparent to introspection). Additionally, Pydantic v2's `FieldInfo.annotation` strips the `Annotated` wrapper before storing, so by the time this helper runs against `model.model_fields[name].annotation`, the value is already `list[str] | None` (no `Annotated`). In that case `get_origin` returns `Union` directly, the function recurses into the union branch, and finds `list`. The explicit `if origin is typing.Annotated:` check on line 71 likely never fires. Worth either: 1. A unit test that asserts `_is_list_annotation(Annotated[list[str], 'x'])` returns `True` to lock in the behaviour and confirm the branch is needed. 2. Or trim the `Annotated` branch and rely on `get_origin`'s built-in transparency. (If you've observed a case where it WAS needed in practice, a one-line comment naming that case would help future readers understand why the branch is there.) ########## airflow-ctl/src/airflowctl/api/operations.py: ########## @@ -920,3 +928,51 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp return PluginImportErrorCollectionResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e + + +class TaskInstanceOperations(BaseOperations): + """Task instance operations.""" + + def _parse_task_instance_response( + self, data: dict | _list + ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + """Parse task instance response data into appropriate models.""" + if isinstance(data, list): + return [TaskInstanceResponse.model_validate(item) for item in data] + if "task_instances" in data: + return TaskInstanceCollectionResponse.model_validate(data) + return TaskInstanceResponse.model_validate(data) + + def get( + self, dag_id: str, dag_run_id: str, task_id: str + ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + """Get a task instance.""" + self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + return self._parse_task_instance_response(self.response.json()) + + def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: + """List task instances.""" + return super().execute_list( + path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances", + data_model=TaskInstanceCollectionResponse, + ) + + def clear( + self, dag_id: str, body: ClearTaskInstancesBody + ) -> TaskInstanceCollectionResponse | ServerResponseError: + """Clear task instances.""" + self.response = self.client.post( + f"dags/{dag_id}/clearTaskInstances", + json=body.model_dump(mode="json", exclude_unset=True), + ) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) + + def update( + self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody + ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: + """Update a task instance.""" + self.response = self.client.patch( + f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", + json=body.model_dump(mode="json", exclude_unset=True), + ) + return self._parse_task_instance_response(self.response.json()) Review Comment: **Major — return types are wider than the API contract.** I checked the OpenAPI spec and the route implementations to be sure: - `GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}` always returns one `TaskInstanceResponse` ([`task_instances.py:get_task_instance`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py)). - `PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}` always returns a `TaskInstanceCollectionResponse` — the implementation wraps results in a collection regardless of whether `map_index` is supplied (`task_instances.py:1226-1234`). - `POST /dags/{dag_id}/clearTaskInstances` always returns `TaskInstanceCollectionResponse`. None of these endpoints ever returns a bare list, so `_parse_task_instance_response`'s `isinstance(data, list)` branch is dead, and the union return types in `get()` and `update()` force every caller to type-narrow against shapes that won't appear. Matching the rest of this file (e.g. `ConnectionsOperations.get → ConnectionResponse`, `PoolsOperations.list → PoolCollectionResponse`), I'd commit to one concrete type per method: ```python def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse: self.response = self.client.get( f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}" ) return TaskInstanceResponse.model_validate_json(self.response.content) def update( self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody ) -> TaskInstanceCollectionResponse: self.response = self.client.patch( f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", json=body.model_dump(mode="json", exclude_unset=True), ) return TaskInstanceCollectionResponse.model_validate_json(self.response.content) ``` …and drop `_parse_task_instance_response` entirely. The code becomes shorter, the types match the API contract, and CLI consumers don't have to introspect the result. Pairs with finding #2 (the tests for the dead branches). ########## airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py: ########## @@ -19,6 +19,7 @@ from __future__ import annotations import json +import re Review Comment: **Minor — scope creep.** The changes in this file (`import re` move + ANSI-strip / line-wrap-tolerant assertion in `test_export_json_to_file`) are unrelated to the taskinstance commands this PR adds. They look correct in isolation, but they belong in a separate cleanup PR. Project convention is to keep PRs focused on one logical change — see [`05_pull_requests.rst`](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst). If these changes were needed to make CI green on top of this branch (e.g. a flaky test was blocking your PR), call it out in the PR body so a reviewer doesn't wonder why pool tests are touched in a taskinstance PR. Not a hard blocker — happy to wave it through if you call it out — but next time, splitting these would be cleaner. ########## airflow-ctl/tests/airflow_ctl/api/test_operations.py: ########## @@ -1843,3 +1848,242 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_api_client(transport=httpx.MockTransport(handle_request)) response = client.plugins.list_import_errors() assert response == self.plugin_import_error_collection_response + + +class TestTaskInstanceOperations: + """Test suite for Task Instance operations.""" + + dag_id: str = "test_dag" + dag_run_id: str = "manual__2025-01-24T00:00:00+00:00" + task_id: str = "test_task" + + task_instance_response = TaskInstanceResponse( + id=uuid.uuid4(), + task_id=task_id, + dag_id=dag_id, + dag_run_id=dag_run_id, + map_index=-1, + logical_date=datetime.datetime(2025, 1, 24, 0, 0, 0), + run_after=datetime.datetime(2025, 1, 24, 0, 0, 0), + start_date=datetime.datetime(2025, 1, 24, 0, 0, 1), + end_date=datetime.datetime(2025, 1, 24, 0, 0, 10), + duration=9.0, + state=TaskInstanceState.SUCCESS, + try_number=1, + max_tries=0, + task_display_name=task_id, + dag_display_name=dag_id, + hostname="hostname", + unixname="airflow", + pool="default_pool", + pool_slots=1, + queue="default", + priority_weight=1, + operator="EmptyOperator", + executor_config="{}", + note=None, + ) + + task_instance_collection_response = TaskInstanceCollectionResponse( + task_instances=[task_instance_response], + total_entries=1, + ) + + def test_get(self): + """Test fetching a single task instance.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + ) + assert response == self.task_instance_response + + def test_get_list(self): Review Comment: **Major — tests for response shapes the API never produces.** `test_get_list` (line 1909), `test_get_collection` (line 1926), `test_update_list` (line 2051), and `test_update_collection` (line 2070) mock the server returning shapes the actual REST API never emits (per the spec/implementation analysis in finding #1). These contribute no real coverage — they test that `_parse_task_instance_response` does what `_parse_task_instance_response` does. Worse, they'd hide a regression: if the dispatcher ever started returning the wrong type, these tests would pass against synthetic input while the real API path broke. Once #1 lands (single concrete return type per method), drop these four tests. Keep the canonical happy-path tests: - `test_get` (returns `TaskInstanceResponse`) - `test_list` (returns `TaskInstanceCollectionResponse`) - `test_clear` and `test_clear_with_options` (return `TaskInstanceCollectionResponse`) - `test_update` and `test_update_with_note` (return `TaskInstanceCollectionResponse`) That's still 6 unit tests covering all four endpoints with body-variant coverage on clear and update. Plenty for a CLI-facing operations class. ########## airflow-ctl/docs/images/command_hashes.txt: ########## @@ -1,5 +1,5 @@ -main:27a22c00dcf32e7a1a4f06672dc8e3c8 -assets:70619a2d92bda80930cde2aefcd8e1cd +main:df0fbf2487ad50774d706a96d76f5c70 +assets:b3ae2b933e54528bf486ff28e887804d Review Comment: **Question / nit.** Adding the `taskinstance` group to the help should only change the `main:` hash on line 1 (the top-level help gains a new entry). The `assets:` hash also changing on line 2 is suspicious — there's no taskinstance-related change visible to the assets help. Likely either: 1. A regeneration artifact (the regenerator processed the whole tree and picked up unrelated drift somewhere upstream of you), or 2. An accidental hash update from a regeneration that wasn't deterministic. Worth re-running the hash regeneration on a clean checkout of `main`, then on top of your branch, and confirming only `main` differs. ########## airflow-ctl/src/airflowctl/api/operations.py: ########## @@ -87,6 +91,10 @@ T = TypeVar("T", bound=BaseModel) +# Type alias used inside classes that define a ``list()`` method, which +# shadows the builtin ``list`` and confuses mypy when used in annotations. +_list = list Review Comment: **Minor.** This works, but the bare `_list = list` at module scope reads as a typo at first glance (no import, no immediately-obvious purpose). The file already has `from __future__ import annotations` at the top, so annotations are strings at parse time and runtime shadowing of `list` by `def list(self, ...)` shouldn't reach mypy. If mypy is genuinely confused in practice, `from builtins import list as _list` communicates intent more clearly to a reader. And if findings #1+#2 land — none of the kept methods would have `list[...]` in their signatures, so the alias may be unnecessary entirely. Worth re-checking once those are applied. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
