This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c0c24e49910 Added handler for cli command exit code in 
CloudComposerRunAirflowCLICommandOperator. (#54497)
c0c24e49910 is described below

commit c0c24e4991031c0010f9cfcb198a36e27a0baa28
Author: Nitochkin <62333822+crowi...@users.noreply.github.com>
AuthorDate: Thu Aug 28 12:10:20 2025 +0200

    Added handler for cli command exit code in 
CloudComposerRunAirflowCLICommandOperator. (#54497)
    
    Co-authored-by: Anton Nitochkin <nitoch...@google.com>
---
 .../providers/google/cloud/hooks/cloud_composer.py      |  7 ++++++-
 .../providers/google/cloud/operators/cloud_composer.py  | 12 +++++++++---
 .../providers/google/cloud/triggers/cloud_composer.py   | 17 +++++++++++++++--
 .../unit/google/cloud/operators/test_cloud_composer.py  |  6 ++++++
 4 files changed, 36 insertions(+), 6 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
index d7fbc438fd9..5026b63005e 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
@@ -642,7 +642,12 @@ class CloudComposerAsyncHook(GoogleBaseHook):
                 self.log.exception("Exception occurred while polling CMD 
result")
                 raise AirflowException(ex)
 
-            result_dict = PollAirflowCommandResponse.to_dict(result)
+            try:
+                result_dict = PollAirflowCommandResponse.to_dict(result)
+            except Exception as ex:
+                self.log.exception("Exception occurred while transforming 
PollAirflowCommandResponse")
+                raise AirflowException(ex)
+
             if result_dict["output_end"]:
                 return result_dict
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
index 59704b3b4cb..8d5a13561d4 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -764,9 +764,15 @@ class 
CloudComposerRunAirflowCLICommandOperator(GoogleCloudBaseOperator):
             metadata=self.metadata,
             poll_interval=self.poll_interval,
         )
-        result_str = self._merge_cmd_output_result(result)
-        self.log.info("Command execution result:\n%s", result_str)
-        return result
+        exit_code = result.get("exit_info", {}).get("exit_code")
+        if exit_code == 0:
+            result_str = self._merge_cmd_output_result(result)
+            self.log.info("Command execution result:\n%s", result_str)
+            return result
+
+        error_output = "".join(line["content"] for line in result.get("error", 
[]))
+        message = f"Airflow CLI command failed with exit code 
{exit_code}.\nError output:\n{error_output}"
+        raise AirflowException(message)
 
     def execute_complete(self, context: Context, event: dict) -> dict:
         if event and event["status"] == "error":
diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py
 
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py
index 8b10d2048c5..4382b114bd4 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py
@@ -145,10 +145,23 @@ class CloudComposerAirflowCLICommandTrigger(BaseTrigger):
             )
             return
 
+        exit_code = result.get("exit_info", {}).get("exit_code")
+
+        if exit_code == 0:
+            yield TriggerEvent(
+                {
+                    "status": "success",
+                    "result": result,
+                }
+            )
+            return
+
+        error_output = "".join(line["content"] for line in result.get("error", 
[]))
+        message = f"Airflow CLI command failed with exit code 
{exit_code}.\nError output:\n{error_output}"
         yield TriggerEvent(
             {
-                "status": "success",
-                "result": result,
+                "status": "error",
+                "message": message,
             }
         )
         return
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py 
b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
index e882db9526a..e6da6cf5cf9 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
@@ -319,6 +319,12 @@ class TestCloudComposerRunAirflowCLICommandOperator:
     
@mock.patch(COMPOSER_STRING.format("ExecuteAirflowCommandResponse.to_dict"))
     @mock.patch(COMPOSER_STRING.format("CloudComposerHook"))
     def test_execute(self, mock_hook, to_dict_mode) -> None:
+        mock_hook.return_value.wait_command_execution_result.return_value = {
+            "exit_info": {"exit_code": 0},
+            "output": [
+                {"content": "test"},
+            ],
+        }
         op = CloudComposerRunAirflowCLICommandOperator(
             task_id=TASK_ID,
             project_id=TEST_GCP_PROJECT,

Reply via email to