This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 383ad31c76 Raise error when ``DagRun`` fails while running ``dag 
test`` (#36517)
383ad31c76 is described below

commit 383ad31c76411fb0a9f7d4243729d7bb0640ff0c
Author: Kaxil Naik <kaxiln...@apache.org>
AuthorDate: Tue Jan 2 16:08:55 2024 +0530

    Raise error when ``DagRun`` fails while running ``dag test`` (#36517)
    
    **Motivation**:
    
    Currently, when using `airflow dags test`, there is no easy way to know 
programmatically if a DagRun fails since the state is not stored in DB. The way 
to do know relies on log lines as below:
    
    ```bash
    state=$(airflow dags test exception_dag | grep "DagRun Finished" | awk -F, 
'{for(i=1;i<=NF;i++) if ($i ~ / state=/) print $i}' | awk -F= '{print $2}') if 
[[ $state == "failed" ]]; then exit 1 else exit 0 fi
    ```
    
    This PR adds will return an exit code 1 when `airflow dags test` command if 
DagRun fails and makes it easy to integrate in CI for testing.
---
 airflow/cli/commands/dag_command.py    |  5 ++++-
 tests/cli/commands/test_dag_command.py | 11 +++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 74609c41dc..a9d6aaf342 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -515,7 +515,7 @@ def dag_test(args, dag: DAG | None = None, session: Session 
= NEW_SESSION) -> No
             raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. 
Error: {e}")
     execution_date = args.execution_date or timezone.utcnow()
     dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
-    dag.test(execution_date=execution_date, run_conf=run_conf, session=session)
+    dr: DagRun = dag.test(execution_date=execution_date, run_conf=run_conf, 
session=session)
     show_dagrun = args.show_dagrun
     imgcat = args.imgcat_dagrun
     filename = args.save_dagrun
@@ -536,6 +536,9 @@ def dag_test(args, dag: DAG | None = None, session: Session 
= NEW_SESSION) -> No
         if show_dagrun:
             print(dot_graph.source)
 
+    if dr and dr.state == DagRunState.FAILED:
+        raise SystemExit("DagRun failed")
+
 
 @cli_utils.action_cli
 @providers_configuration_loaded
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 30b5c475ea..4f16c381ad 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -43,6 +43,7 @@ from airflow.triggers.base import TriggerEvent
 from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
 from airflow.utils import timezone
 from airflow.utils.session import create_session
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils.config import conf_vars
@@ -747,6 +748,16 @@ class TestCliDags:
             ]
         )
 
+    @mock.patch("airflow.cli.commands.dag_command.get_dag")
+    def test_dag_test_fail_raise_error(self, mock_get_dag):
+        execution_date_str = DEFAULT_DATE.isoformat()
+        mock_get_dag.return_value.test.return_value = DagRun(
+            dag_id="example_bash_operator", execution_date=DEFAULT_DATE, 
state=DagRunState.FAILED
+        )
+        cli_args = self.parser.parse_args(["dags", "test", 
"example_bash_operator", execution_date_str])
+        with pytest.raises(SystemExit, match=r"DagRun failed"):
+            dag_command.dag_test(cli_args)
+
     @mock.patch("airflow.cli.commands.dag_command.get_dag")
     @mock.patch("airflow.utils.timezone.utcnow")
     def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag):

Reply via email to