This is an automated email from the ASF dual-hosted git repository.
bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d5f67348396 bugfix(airflowctl): support primitive and datamodel in
operation parameters and fix trigger DagRun (#54073)
d5f67348396 is described below
commit d5f6734839608dd3953b99f7ff7f27436dec3f45
Author: Bugra Ozturk <[email protected]>
AuthorDate: Wed Aug 6 21:58:55 2025 +0200
bugfix(airflowctl): support primitive and datamodel in operation parameters
and fix trigger DagRun (#54073)
* bugfix(airflowctl): support both primitive and datamodel in the same
operation and fix trigger DagRun
* feat(api): allow none in conf while triggering dagrun while the
underlying table allows passing none
---
.../api_fastapi/core_api/datamodels/dag_run.py | 2 +-
.../core_api/openapi/v2-rest-api-generated.yaml | 6 ++-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 ++++-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 4 +-
airflow-ctl/src/airflowctl/api/operations.py | 7 ++--
airflow-ctl/src/airflowctl/ctl/cli_config.py | 47 +++++++++++++++-------
.../tests/airflow_ctl/api/test_operations.py | 9 ++---
7 files changed, 55 insertions(+), 31 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 901fc2a53a8..cc128809d43 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -102,7 +102,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
logical_date: AwareDatetime | None
run_after: datetime | None = Field(default_factory=timezone.utcnow)
- conf: dict = Field(default_factory=dict)
+ conf: dict | None = Field(default_factory=dict)
note: str | None = None
@model_validator(mode="after")
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 89c7a8a92b9..5c413bf03b9 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11766,8 +11766,10 @@ components:
- type: 'null'
title: Run After
conf:
- additionalProperties: true
- type: object
+ anyOf:
+ - additionalProperties: true
+ type: object
+ - type: 'null'
title: Conf
note:
anyOf:
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f83735ebd58..a5b4ac61e6a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5710,8 +5710,15 @@ export const $TriggerDAGRunPostBody = {
title: 'Run After'
},
conf: {
- additionalProperties: true,
- type: 'object',
+ anyOf: [
+ {
+ additionalProperties: true,
+ type: 'object'
+ },
+ {
+ type: 'null'
+ }
+ ],
title: 'Conf'
},
note: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 047f667ddef..5a7cd9d3cad 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1443,8 +1443,8 @@ export type TriggerDAGRunPostBody = {
logical_date: string | null;
run_after?: string | null;
conf?: {
- [key: string]: unknown;
- };
+ [key: string]: unknown;
+} | null;
note?: string | null;
};
diff --git a/airflow-ctl/src/airflowctl/api/operations.py
b/airflow-ctl/src/airflowctl/api/operations.py
index d7f62040ec7..fe56661d2d0 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -554,11 +554,10 @@ class DagRunOperations(BaseOperations):
self, dag_id: str, trigger_dag_run: TriggerDAGRunPostBody
) -> DAGRunResponse | ServerResponseError:
"""Create a dag run."""
+ if trigger_dag_run.conf is None:
+ trigger_dag_run.conf = {}
try:
- # It is model_dump_json() because it has unparsable json datetime
objects
- self.response = self.client.post(
- f"/dags/{dag_id}/dagRuns",
json=trigger_dag_run.model_dump_json()
- )
+ self.response = self.client.post(f"dags/{dag_id}/dagRuns",
json=trigger_dag_run.model_dump())
return DAGRunResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py
b/airflow-ctl/src/airflowctl/ctl/cli_config.py
index 29e55d4a36c..04cbee3d27f 100644
--- a/airflow-ctl/src/airflowctl/ctl/cli_config.py
+++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py
@@ -28,6 +28,7 @@ import inspect
import os
from argparse import Namespace
from collections.abc import Callable, Iterable
+from enum import Enum
from functools import partial
from pathlib import Path
from typing import Any, NamedTuple
@@ -361,7 +362,7 @@ class CommandFactory:
# Exclude parameters that are not needed for CLI from datamodels
self.excluded_parameters = ["schema_"]
# This list is used to determine if the command/operation needs to
output data
- self.output_command_list = ["list", "get", "create", "delete",
"update"]
+ self.output_command_list = ["list", "get", "create", "delete",
"update", "trigger"]
self.exclude_operation_names = ["LoginOperations",
"VersionOperations", "BaseOperations"]
self.exclude_method_names = [
"error",
@@ -371,6 +372,9 @@ class CommandFactory:
# Excluding bulk operation. Out of scope for CLI. Should use
implemented commands.
"bulk",
]
+ self.excluded_output_keys = [
+ "total_entries",
+ ]
def _inspect_operations(self) -> None:
"""Parse file and return matching Operation Method with details."""
@@ -571,6 +575,7 @@ class CommandFactory:
# Walk through all args and create a dictionary such as args.abc
-> {"abc": "value"}
method_params = {}
datamodel = None
+ datamodel_param_name = None
args_dict = vars(args)
for parameter in api_operation["parameters"]:
for parameter_key, parameter_type in parameter.items():
@@ -581,16 +586,24 @@ class CommandFactory:
else:
datamodel = getattr(generated_datamodels,
parameter_type)
for expanded_parameter in
self.datamodels_extended_map[parameter_type]:
+ if parameter_key not in method_params:
+ method_params[parameter_key] = {}
+ datamodel_param_name = parameter_key
if expanded_parameter in self.excluded_parameters:
continue
if expanded_parameter in args_dict.keys():
-
method_params[self._sanitize_method_param_key(expanded_parameter)] = (
- args_dict[expanded_parameter]
- )
+ method_params[parameter_key][
+
self._sanitize_method_param_key(expanded_parameter)
+ ] = args_dict[expanded_parameter]
if datamodel:
- method_params = datamodel.model_validate(method_params)
- method_output = operation_method_object(method_params)
+ if datamodel_param_name:
+ method_params[datamodel_param_name] =
datamodel.model_validate(
+ method_params[datamodel_param_name]
+ )
+ else:
+ method_params = datamodel.model_validate(method_params)
+ method_output = operation_method_object(**method_params)
else:
method_output = operation_method_object(**method_params)
@@ -605,22 +618,28 @@ class CommandFactory:
def check_operation_and_collect_list_of_dict(dict_obj: dict) ->
list:
"""Check if the object is a nested dictionary and collect list
of dictionaries."""
- if isinstance(dict_obj, dict):
- return [dict_obj]
def is_dict_nested(obj: dict) -> bool:
"""Check if the object is a nested dictionary."""
return any(isinstance(i, dict) or isinstance(i, list) for
i in obj.values())
- # Find result from list operation
if is_dict_nested(dict_obj):
- for _, value in dict_obj.items():
+ iteration_dict = dict_obj.copy()
+ for key, value in iteration_dict.items():
+ if key in self.excluded_output_keys:
+ del dict_obj[key]
+ continue
+ if isinstance(value, Enum):
+ dict_obj[key] = value.value
if isinstance(value, list):
- return value
+ dict_obj[key] = value
if isinstance(value, dict):
- result =
check_operation_and_collect_list_of_dict(value)
- if result:
- return result
+ dict_obj[key] =
check_operation_and_collect_list_of_dict(value)
+
+ # If dict_obj only have single key return value instead of list
+ # This can happen since we are excluding some keys from user
such as total_entries from list operations
+ if len(dict_obj) == 1:
+ return dict_obj[next(iter(dict_obj.keys()))]
# If not nested, return the object as a list which the result
should be already a dict
return [dict_obj]
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index edb81007d46..c18751a8709 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -898,8 +898,8 @@ class TestDagRunOperations:
data_interval_start=datetime.datetime(2025, 1, 1, 0, 0, 0),
data_interval_end=datetime.datetime(2025, 1, 1, 0, 0, 0),
last_scheduling_decision=datetime.datetime(2025, 1, 1, 0, 0, 0),
- run_type=DagRunType.MANUAL,
run_after=datetime.datetime(2025, 1, 1, 0, 0, 0),
+ run_type=DagRunType.MANUAL,
state=DagRunState.RUNNING,
triggered_by=DagRunTriggeredByType.UI,
conf={},
@@ -923,11 +923,8 @@ class TestDagRunOperations:
)
trigger_dag_run = TriggerDAGRunPostBody(
- dag_run_id=dag_run_id,
- data_interval_start=datetime.datetime(2025, 1, 1, 0, 0, 0),
- data_interval_end=datetime.datetime(2025, 1, 1, 0, 0, 0),
- conf={},
- note="note",
+ conf=None,
+ note=None,
)
def test_get(self):