potiuk commented on code in PR #66469:
URL: https://github.com/apache/airflow/pull/66469#discussion_r3215351516


##########
airflow-ctl/src/airflowctl/ctl/cli_config.py:
##########
@@ -52,6 +53,31 @@
 BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
 
 
+def _is_list_annotation(annotation: Any) -> bool:
+    """
+    Check whether a Pydantic field annotation is a list type.
+
+    Handles ``Annotated[list[...] | None, ...]`` and similar wrapped forms
+    that ``typing.get_origin`` alone cannot detect.
+    """
+    origin = typing.get_origin(annotation)
+
+    # Direct list[...]
+    if origin is list:
+        return True
+
+    # Unwrap Annotated[X, ...]
+    if origin is typing.Annotated:
+        inner = typing.get_args(annotation)[0]
+        return _is_list_annotation(inner)
+
+    # Unwrap Union / X | None
+    if origin is typing.Union:
+        return any(_is_list_annotation(arg) for arg in 
typing.get_args(annotation) if arg is not type(None))

Review Comment:
   **Minor — possibly dead branch.** Per [Python typing 
docs](https://docs.python.org/3/library/typing.html#typing.get_origin), 
`typing.get_origin(Annotated[X, ...])` returns `get_origin(X)` (i.e. the 
`Annotated` marker is transparent to introspection). Additionally, Pydantic 
v2's `FieldInfo.annotation` strips the `Annotated` wrapper before storing, so 
by the time this helper runs against `model.model_fields[name].annotation`, the 
value is already `list[str] | None` (no `Annotated`).
   
   In that case `get_origin` returns `Union` directly, the function recurses 
into the union branch, and finds `list`. The explicit `if origin is 
typing.Annotated:` check on line 71 likely never fires.
   
   Worth either:
   1. A unit test that asserts `_is_list_annotation(Annotated[list[str], 'x'])` 
returns `True` to lock in the behaviour and confirm the branch is needed.
   2. Or trim the `Annotated` branch and rely on `get_origin`'s built-in 
transparency.
   
   (If you've observed a case where it WAS needed in practice, a one-line 
comment naming that case would help future readers understand why the branch is 
there.)



##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -920,3 +928,51 @@ def list_import_errors(self) -> 
PluginImportErrorCollectionResponse | ServerResp
             return 
PluginImportErrorCollectionResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
+
+
+class TaskInstanceOperations(BaseOperations):
+    """Task instance operations."""
+
+    def _parse_task_instance_response(
+        self, data: dict | _list
+    ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | 
TaskInstanceCollectionResponse:
+        """Parse task instance response data into appropriate models."""
+        if isinstance(data, list):
+            return [TaskInstanceResponse.model_validate(item) for item in data]
+        if "task_instances" in data:
+            return TaskInstanceCollectionResponse.model_validate(data)
+        return TaskInstanceResponse.model_validate(data)
+
+    def get(
+        self, dag_id: str, dag_run_id: str, task_id: str
+    ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | 
TaskInstanceCollectionResponse:
+        """Get a task instance."""
+        self.response = 
self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}")
+        return self._parse_task_instance_response(self.response.json())
+
+    def list(self, dag_id: str, dag_run_id: str) -> 
TaskInstanceCollectionResponse | ServerResponseError:
+        """List task instances."""
+        return super().execute_list(
+            path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",
+            data_model=TaskInstanceCollectionResponse,
+        )
+
+    def clear(
+        self, dag_id: str, body: ClearTaskInstancesBody
+    ) -> TaskInstanceCollectionResponse | ServerResponseError:
+        """Clear task instances."""
+        self.response = self.client.post(
+            f"dags/{dag_id}/clearTaskInstances",
+            json=body.model_dump(mode="json", exclude_unset=True),
+        )
+        return 
TaskInstanceCollectionResponse.model_validate_json(self.response.content)
+
+    def update(
+        self, dag_id: str, dag_run_id: str, task_id: str, body: 
PatchTaskInstanceBody
+    ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | 
TaskInstanceCollectionResponse:
+        """Update a task instance."""
+        self.response = self.client.patch(
+            f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}",
+            json=body.model_dump(mode="json", exclude_unset=True),
+        )
+        return self._parse_task_instance_response(self.response.json())

Review Comment:
   **Major — return types are wider than the API contract.** I checked the 
OpenAPI spec and the route implementations to be sure:
   
   - `GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}` always 
returns one `TaskInstanceResponse` 
([`task_instances.py:get_task_instance`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py)).
   - `PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}` always 
returns a `TaskInstanceCollectionResponse` — the implementation wraps results 
in a collection regardless of whether `map_index` is supplied 
(`task_instances.py:1226-1234`).
   - `POST /dags/{dag_id}/clearTaskInstances` always returns 
`TaskInstanceCollectionResponse`.
   
   None of these endpoints ever returns a bare list, so 
`_parse_task_instance_response`'s `isinstance(data, list)` branch is dead, and 
the union return types in `get()` and `update()` force every caller to 
type-narrow against shapes that won't appear.
   
   Matching the rest of this file (e.g. `ConnectionsOperations.get → 
ConnectionResponse`, `PoolsOperations.list → PoolCollectionResponse`), I'd 
commit to one concrete type per method:
   
   ```python
   def get(self, dag_id: str, dag_run_id: str, task_id: str) -> 
TaskInstanceResponse:
       self.response = self.client.get(
           f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
       )
       return TaskInstanceResponse.model_validate_json(self.response.content)
   
   def update(
       self, dag_id: str, dag_run_id: str, task_id: str, body: 
PatchTaskInstanceBody
   ) -> TaskInstanceCollectionResponse:
       self.response = self.client.patch(
           f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}",
           json=body.model_dump(mode="json", exclude_unset=True),
       )
       return 
TaskInstanceCollectionResponse.model_validate_json(self.response.content)
   ```
   
   …and drop `_parse_task_instance_response` entirely. The code becomes 
shorter, the types match the API contract, and CLI consumers don't have to 
introspect the result. Pairs with finding #2 (the tests for the dead branches).



##########
airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py:
##########
@@ -19,6 +19,7 @@
 from __future__ import annotations
 
 import json
+import re

Review Comment:
   **Minor — scope creep.** The changes in this file (`import re` move + 
ANSI-strip / line-wrap-tolerant assertion in `test_export_json_to_file`) are 
unrelated to the taskinstance commands this PR adds. They look correct in 
isolation, but they belong in a separate cleanup PR.
   
   Project convention is to keep PRs focused on one logical change — see 
[`05_pull_requests.rst`](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst).
 If these changes were needed to make CI green on top of this branch (e.g. a 
flaky test was blocking your PR), call it out in the PR body so a reviewer 
doesn't wonder why pool tests are touched in a taskinstance PR.
   
   Not a hard blocker — happy to wave it through if you call it out — but next 
time, splitting these would be cleaner.



##########
airflow-ctl/tests/airflow_ctl/api/test_operations.py:
##########
@@ -1843,3 +1848,242 @@ def handle_request(request: httpx.Request) -> 
httpx.Response:
         client = make_api_client(transport=httpx.MockTransport(handle_request))
         response = client.plugins.list_import_errors()
         assert response == self.plugin_import_error_collection_response
+
+
+class TestTaskInstanceOperations:
+    """Test suite for Task Instance operations."""
+
+    dag_id: str = "test_dag"
+    dag_run_id: str = "manual__2025-01-24T00:00:00+00:00"
+    task_id: str = "test_task"
+
+    task_instance_response = TaskInstanceResponse(
+        id=uuid.uuid4(),
+        task_id=task_id,
+        dag_id=dag_id,
+        dag_run_id=dag_run_id,
+        map_index=-1,
+        logical_date=datetime.datetime(2025, 1, 24, 0, 0, 0),
+        run_after=datetime.datetime(2025, 1, 24, 0, 0, 0),
+        start_date=datetime.datetime(2025, 1, 24, 0, 0, 1),
+        end_date=datetime.datetime(2025, 1, 24, 0, 0, 10),
+        duration=9.0,
+        state=TaskInstanceState.SUCCESS,
+        try_number=1,
+        max_tries=0,
+        task_display_name=task_id,
+        dag_display_name=dag_id,
+        hostname="hostname",
+        unixname="airflow",
+        pool="default_pool",
+        pool_slots=1,
+        queue="default",
+        priority_weight=1,
+        operator="EmptyOperator",
+        executor_config="{}",
+        note=None,
+    )
+
+    task_instance_collection_response = TaskInstanceCollectionResponse(
+        task_instances=[task_instance_response],
+        total_entries=1,
+    )
+
+    def test_get(self):
+        """Test fetching a single task instance."""
+
+        def handle_request(request: httpx.Request) -> httpx.Response:
+            assert request.url.path == (
+                
f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}"
+            )
+            return httpx.Response(200, 
json=json.loads(self.task_instance_response.model_dump_json()))
+
+        client = make_api_client(transport=httpx.MockTransport(handle_request))
+        response = client.task_instances.get(
+            dag_id=self.dag_id,
+            dag_run_id=self.dag_run_id,
+            task_id=self.task_id,
+        )
+        assert response == self.task_instance_response
+
+    def test_get_list(self):

Review Comment:
   **Major — tests for response shapes the API never produces.** 
`test_get_list` (line 1909), `test_get_collection` (line 1926), 
`test_update_list` (line 2051), and `test_update_collection` (line 2070) mock 
the server returning shapes the actual REST API never emits (per the 
spec/implementation analysis in finding #1).
   
   These contribute no real coverage — they test that 
`_parse_task_instance_response` does what `_parse_task_instance_response` does. 
Worse, they'd hide a regression: if the dispatcher ever started returning the 
wrong type, these tests would pass against synthetic input while the real API 
path broke.
   
   Once #1 lands (single concrete return type per method), drop these four 
tests. Keep the canonical happy-path tests:
   
   - `test_get` (returns `TaskInstanceResponse`)
   - `test_list` (returns `TaskInstanceCollectionResponse`)
   - `test_clear` and `test_clear_with_options` (return 
`TaskInstanceCollectionResponse`)
   - `test_update` and `test_update_with_note` (return 
`TaskInstanceCollectionResponse`)
   
   That's still 6 unit tests covering all four endpoints with body-variant 
coverage on clear and update. Plenty for a CLI-facing operations class.



##########
airflow-ctl/docs/images/command_hashes.txt:
##########
@@ -1,5 +1,5 @@
-main:27a22c00dcf32e7a1a4f06672dc8e3c8
-assets:70619a2d92bda80930cde2aefcd8e1cd
+main:df0fbf2487ad50774d706a96d76f5c70
+assets:b3ae2b933e54528bf486ff28e887804d

Review Comment:
   **Question / nit.** Adding the `taskinstance` group to the help should only 
change the `main:` hash on line 1 (the top-level help gains a new entry). The 
`assets:` hash also changing on line 2 is suspicious — there's no 
taskinstance-related change visible to the assets help.
   
   Likely either:
   1. A regeneration artifact (the regenerator processed the whole tree and 
picked up unrelated drift somewhere upstream of you), or
   2. An accidental hash update from a regeneration that wasn't deterministic.
   
   Worth re-running the hash regeneration on a clean checkout of `main`, then 
on top of your branch, and confirming only `main` differs.



##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -87,6 +91,10 @@
 
 T = TypeVar("T", bound=BaseModel)
 
+# Type alias used inside classes that define a ``list()`` method, which
+# shadows the builtin ``list`` and confuses mypy when used in annotations.
+_list = list

Review Comment:
   **Minor.** This works, but the bare `_list = list` at module scope reads as 
a typo at first glance (no import, no immediately-obvious purpose).
   
   The file already has `from __future__ import annotations` at the top, so 
annotations are strings at parse time and runtime shadowing of `list` by `def 
list(self, ...)` shouldn't reach mypy. If mypy is genuinely confused in 
practice, `from builtins import list as _list` communicates intent more clearly 
to a reader.
   
   And if findings #1+#2 land — none of the kept methods would have `list[...]` 
in their signatures, so the alias may be unnecessary entirely. Worth 
re-checking once those are applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to