bugraoz93 commented on code in PR #67947:
URL: https://github.com/apache/airflow/pull/67947#discussion_r3349965080
##########
airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py:
##########
@@ -94,12 +94,17 @@ def date_param():
"dags update example_bash_operator --no-is-paused",
# Dag Run commands
"dagrun list --dag-id example_bash_operator --state success --limit=1",
- # XCom commands - need a Dag run with completed tasks
- 'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key}
\'{{"test": "value"}}\'',
- 'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}',
- 'xcom list example_bash_operator "manual__{date_param}" runme_0',
- 'xcom edit example_bash_operator "manual__{date_param}" runme_0 {xcom_key}
\'{{"updated": "value"}}\'',
- 'xcom delete example_bash_operator "manual__{date_param}" runme_0
{xcom_key}',
+ # Task Instance commands
+ 'taskinstance list --dag-id=example_bash_operator
--dag-run-id="manual__{date_param}"',
+ 'taskinstance get --dag-id=example_bash_operator
--dag-run-id="manual__{date_param}" --task-id=runme_0',
+ "taskinstance clear --dag-id=example_bash_operator --dry-run",
+ 'taskinstance update --dag-id=example_bash_operator
--dag-run-id="manual__{date_param}" --task-id=runme_0 --new-state=success',
+ # XCom commands - need a DAG run with completed tasks
+ 'xcom add --dag-id=example_bash_operator
--dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}
--value=\'{{"test": "value"}}\'',
Review Comment:
I think if fields are mandatory they should be path parameter without any
param name to be optional. Can you rebase? Maybe you don't have those. Because
we shouldn't update old commands in this work, they should work as is and we
should ensure required fields are not passed as optional for taskinstance
commands too
##########
airflow-ctl/src/airflowctl/ctl/cli_config.py:
##########
@@ -52,6 +54,43 @@
BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
+def _is_list_annotation(annotation: Any) -> bool:
+ """Check whether a Pydantic field annotation is a list type (including
Optional[list[...]])."""
+ origin = typing.get_origin(annotation)
+ if origin is list:
+ return True
+ # Handle both typing.Union (Optional[list[...]]) and PEP-604 X | Y
(types.UnionType)
+ if origin is typing.Union or isinstance(annotation,
builtin_types.UnionType):
+ return any(_is_list_annotation(arg) for arg in
typing.get_args(annotation) if arg is not type(None))
+ return False
+
+
+def _parse_task_ids_cli_arg(value: str) -> list:
Review Comment:
This looks bespoke to me, I don't think we should put task base exceptional
bespoke implementations here. We should add them to operation side if needed, I
assume we shouldn't have this much changes here
##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -925,3 +929,46 @@ def list_import_errors(self) ->
PluginImportErrorCollectionResponse | ServerResp
return
PluginImportErrorCollectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
+
+
+
+class TaskInstanceOperations(BaseOperations):
Review Comment:
Operations name shpuld be tasks as this auto generated should be same as
Airlfow core cli
This creating is correct even though the static checks failing and PR not
finished. Please check the operation class naming
https://github.com/apache/airflow/pull/66926/changes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]