This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 60fc407911 add output format arg for `cli.dags.trigger` (#29224) 60fc407911 is described below commit 60fc40791121b19fe379e4216529b2138162b443 Author: Hussein Awala <houssein.awala...@gmail.com> AuthorDate: Sun Feb 19 16:15:56 2023 +0100 add output format arg for `cli.dags.trigger` (#29224) * add output format arg for cli.dags.trigger * add unit tests for local client * add unitest for dag trigger output * Update tests/api/client/test_local_client.py Co-authored-by: Ephraim Anierobi <splendidzig...@gmail.com> --------- Co-authored-by: Ephraim Anierobi <splendidzig...@gmail.com> Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com> --- airflow/api/client/local_client.py | 21 ++++++++++++++++-- airflow/cli/cli_parser.py | 11 +++++++++- airflow/cli/commands/dag_command.py | 4 ++++ tests/api/client/test_local_client.py | 39 ++++++++++++++++++++++++++++++++++ tests/cli/commands/test_dag_command.py | 25 ++++++++++++++++++++++ 5 files changed, 97 insertions(+), 3 deletions(-) diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py index 2c8f471b39..afdcd0abd6 100644 --- a/airflow/api/client/local_client.py +++ b/airflow/api/client/local_client.py @@ -28,7 +28,9 @@ from airflow.models.pool import Pool class Client(api_client.Client): """Local API client implementation.""" - def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True): + def trigger_dag( + self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True + ) -> dict | None: dag_run = trigger_dag.trigger_dag( dag_id=dag_id, run_id=run_id, @@ -36,7 +38,22 @@ class Client(api_client.Client): execution_date=execution_date, replace_microseconds=replace_microseconds, ) - return f"Created {dag_run}" + if dag_run: + return { + "conf": dag_run.conf, + "dag_id": dag_run.dag_id, + "dag_run_id": dag_run.run_id, + "data_interval_end": dag_run.data_interval_start, + "data_interval_start": dag_run.data_interval_end, + "end_date": dag_run.end_date, + "external_trigger": dag_run.external_trigger, + "last_scheduling_decision": dag_run.last_scheduling_decision, + "logical_date": dag_run.logical_date, + "run_type": dag_run.run_type, + "start_date": dag_run.start_date, + "state": dag_run.state, + } + return dag_run def delete_dag(self, dag_id): count = delete_dag.delete_dag(dag_id) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 72026ade5c..993be83c0a 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1180,7 +1180,16 @@ DAGS_COMMANDS = ( name="trigger", help="Trigger a DAG run", func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"), - args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, ARG_VERBOSE, ARG_REPLACE_MICRO), + args=( + ARG_DAG_ID, + ARG_SUBDIR, + ARG_RUN_ID, + ARG_CONF, + ARG_EXEC_DATE, + ARG_VERBOSE, + ARG_REPLACE_MICRO, + ARG_OUTPUT, + ), ), ActionCommand( name="delete", diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 2df0a162c4..4a9a1f2c12 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -150,6 +150,10 @@ def dag_trigger(args): replace_microseconds=args.replace_microseconds, ) print(message) + AirflowConsole().print_as( + data=[message] if message is not None else [], + output=args.output, + ) except OSError as err: raise AirflowException(err) diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index ff2ea3b048..5527f00dea 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -128,6 +128,45 @@ class TestLocalClient: ) mock.reset_mock() + # test output + queued_at = pendulum.now() + started_at = pendulum.now() + mock.return_value = DagRun( + dag_id=test_dag_id, + run_id=run_id, + queued_at=queued_at, + execution_date=EXECDATE, + start_date=started_at, + external_trigger=True, + state=DagRunState.QUEUED, + conf={}, + run_type=DagRunType.MANUAL, + data_interval=(EXECDATE, EXECDATE + pendulum.duration(hours=1)), + ) + expected_dag_run = { + "conf": {}, + "dag_id": test_dag_id, + "dag_run_id": run_id, + "data_interval_end": EXECDATE, + "data_interval_start": EXECDATE + pendulum.duration(hours=1), + "end_date": None, + "external_trigger": True, + "last_scheduling_decision": None, + "logical_date": EXECDATE, + "run_type": DagRunType.MANUAL, + "start_date": started_at, + "state": DagRunState.QUEUED, + } + dag_run = self.client.trigger_dag(dag_id=test_dag_id) + assert expected_dag_run == dag_run + mock.reset_mock() + + # test output when no DagRun is created + mock.return_value = None + dag_run = self.client.trigger_dag(dag_id=test_dag_id) + assert not dag_run + mock.reset_mock() + def test_delete_dag(self): key = "my_dag_id" diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 419b9bdccc..e667b3b1cd 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -19,6 +19,7 @@ from __future__ import annotations import contextlib import io +import json import os import tempfile from datetime import datetime, timedelta @@ -608,6 +609,30 @@ class TestCliDags: ), ) + def test_trigger_dag_output_as_json(self): + args = self.parser.parse_args( + [ + "dags", + "trigger", + "example_bash_operator", + "--run-id", + "trigger_dag_xxx", + "--conf", + '{"conf1": "val1", "conf2": "val2"}', + "--output=json", + ] + ) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + dag_command.dag_trigger(args) + # get the last line from the logs ignoring all logging lines + out = temp_stdout.getvalue().strip().split("\n")[-1] + parsed_out = json.loads(out) + + assert 1 == len(parsed_out) + assert "example_bash_operator" == parsed_out[0]["dag_id"] + assert "trigger_dag_xxx" == parsed_out[0]["dag_run_id"] + assert {"conf1": "val1", "conf2": "val2"} == parsed_out[0]["conf"] + def test_delete_dag(self): DM = DagModel key = "my_dag_id"