This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 60fc407911 add output format arg for `cli.dags.trigger` (#29224)
60fc407911 is described below

commit 60fc40791121b19fe379e4216529b2138162b443
Author: Hussein Awala <houssein.awala...@gmail.com>
AuthorDate: Sun Feb 19 16:15:56 2023 +0100

    add output format arg for `cli.dags.trigger` (#29224)
    
    * add output format arg for cli.dags.trigger
    
    * add unit tests for local client
    
    * add unitest for dag trigger output
    
    * Update tests/api/client/test_local_client.py
    
    Co-authored-by: Ephraim Anierobi <splendidzig...@gmail.com>
    
    ---------
    
    Co-authored-by: Ephraim Anierobi <splendidzig...@gmail.com>
    Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com>
---
 airflow/api/client/local_client.py     | 21 ++++++++++++++++--
 airflow/cli/cli_parser.py              | 11 +++++++++-
 airflow/cli/commands/dag_command.py    |  4 ++++
 tests/api/client/test_local_client.py  | 39 ++++++++++++++++++++++++++++++++++
 tests/cli/commands/test_dag_command.py | 25 ++++++++++++++++++++++
 5 files changed, 97 insertions(+), 3 deletions(-)

diff --git a/airflow/api/client/local_client.py 
b/airflow/api/client/local_client.py
index 2c8f471b39..afdcd0abd6 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -28,7 +28,9 @@ from airflow.models.pool import Pool
 class Client(api_client.Client):
     """Local API client implementation."""
 
-    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, 
replace_microseconds=True):
+    def trigger_dag(
+        self, dag_id, run_id=None, conf=None, execution_date=None, 
replace_microseconds=True
+    ) -> dict | None:
         dag_run = trigger_dag.trigger_dag(
             dag_id=dag_id,
             run_id=run_id,
@@ -36,7 +38,22 @@ class Client(api_client.Client):
             execution_date=execution_date,
             replace_microseconds=replace_microseconds,
         )
-        return f"Created {dag_run}"
+        if dag_run:
+            return {
+                "conf": dag_run.conf,
+                "dag_id": dag_run.dag_id,
+                "dag_run_id": dag_run.run_id,
+                "data_interval_end": dag_run.data_interval_start,
+                "data_interval_start": dag_run.data_interval_end,
+                "end_date": dag_run.end_date,
+                "external_trigger": dag_run.external_trigger,
+                "last_scheduling_decision": dag_run.last_scheduling_decision,
+                "logical_date": dag_run.logical_date,
+                "run_type": dag_run.run_type,
+                "start_date": dag_run.start_date,
+                "state": dag_run.state,
+            }
+        return dag_run
 
     def delete_dag(self, dag_id):
         count = delete_dag.delete_dag(dag_id)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 72026ade5c..993be83c0a 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1180,7 +1180,16 @@ DAGS_COMMANDS = (
         name="trigger",
         help="Trigger a DAG run",
         func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
-        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, 
ARG_VERBOSE, ARG_REPLACE_MICRO),
+        args=(
+            ARG_DAG_ID,
+            ARG_SUBDIR,
+            ARG_RUN_ID,
+            ARG_CONF,
+            ARG_EXEC_DATE,
+            ARG_VERBOSE,
+            ARG_REPLACE_MICRO,
+            ARG_OUTPUT,
+        ),
     ),
     ActionCommand(
         name="delete",
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 2df0a162c4..4a9a1f2c12 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -150,6 +150,10 @@ def dag_trigger(args):
             replace_microseconds=args.replace_microseconds,
         )
         print(message)
+        AirflowConsole().print_as(
+            data=[message] if message is not None else [],
+            output=args.output,
+        )
     except OSError as err:
         raise AirflowException(err)
 
diff --git a/tests/api/client/test_local_client.py 
b/tests/api/client/test_local_client.py
index ff2ea3b048..5527f00dea 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -128,6 +128,45 @@ class TestLocalClient:
             )
             mock.reset_mock()
 
+            # test output
+            queued_at = pendulum.now()
+            started_at = pendulum.now()
+            mock.return_value = DagRun(
+                dag_id=test_dag_id,
+                run_id=run_id,
+                queued_at=queued_at,
+                execution_date=EXECDATE,
+                start_date=started_at,
+                external_trigger=True,
+                state=DagRunState.QUEUED,
+                conf={},
+                run_type=DagRunType.MANUAL,
+                data_interval=(EXECDATE, EXECDATE + 
pendulum.duration(hours=1)),
+            )
+            expected_dag_run = {
+                "conf": {},
+                "dag_id": test_dag_id,
+                "dag_run_id": run_id,
+                "data_interval_end": EXECDATE,
+                "data_interval_start": EXECDATE + pendulum.duration(hours=1),
+                "end_date": None,
+                "external_trigger": True,
+                "last_scheduling_decision": None,
+                "logical_date": EXECDATE,
+                "run_type": DagRunType.MANUAL,
+                "start_date": started_at,
+                "state": DagRunState.QUEUED,
+            }
+            dag_run = self.client.trigger_dag(dag_id=test_dag_id)
+            assert expected_dag_run == dag_run
+            mock.reset_mock()
+
+            # test output when no DagRun is created
+            mock.return_value = None
+            dag_run = self.client.trigger_dag(dag_id=test_dag_id)
+            assert not dag_run
+            mock.reset_mock()
+
     def test_delete_dag(self):
         key = "my_dag_id"
 
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 419b9bdccc..e667b3b1cd 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import contextlib
 import io
+import json
 import os
 import tempfile
 from datetime import datetime, timedelta
@@ -608,6 +609,30 @@ class TestCliDags:
                 ),
             )
 
+    def test_trigger_dag_output_as_json(self):
+        args = self.parser.parse_args(
+            [
+                "dags",
+                "trigger",
+                "example_bash_operator",
+                "--run-id",
+                "trigger_dag_xxx",
+                "--conf",
+                '{"conf1": "val1", "conf2": "val2"}',
+                "--output=json",
+            ]
+        )
+        with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
+            dag_command.dag_trigger(args)
+            # get the last line from the logs ignoring all logging lines
+            out = temp_stdout.getvalue().strip().split("\n")[-1]
+        parsed_out = json.loads(out)
+
+        assert 1 == len(parsed_out)
+        assert "example_bash_operator" == parsed_out[0]["dag_id"]
+        assert "trigger_dag_xxx" == parsed_out[0]["dag_run_id"]
+        assert {"conf1": "val1", "conf2": "val2"} == parsed_out[0]["conf"]
+
     def test_delete_dag(self):
         DM = DagModel
         key = "my_dag_id"

Reply via email to