This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c0c24e49910 Added handler for cli command exit code in CloudComposerRunAirflowCLICommandOperator. (#54497) c0c24e49910 is described below commit c0c24e4991031c0010f9cfcb198a36e27a0baa28 Author: Nitochkin <62333822+crowi...@users.noreply.github.com> AuthorDate: Thu Aug 28 12:10:20 2025 +0200 Added handler for cli command exit code in CloudComposerRunAirflowCLICommandOperator. (#54497) Co-authored-by: Anton Nitochkin <nitoch...@google.com> --- .../providers/google/cloud/hooks/cloud_composer.py | 7 ++++++- .../providers/google/cloud/operators/cloud_composer.py | 12 +++++++++--- .../providers/google/cloud/triggers/cloud_composer.py | 17 +++++++++++++++-- .../unit/google/cloud/operators/test_cloud_composer.py | 6 ++++++ 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py index d7fbc438fd9..5026b63005e 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py @@ -642,7 +642,12 @@ class CloudComposerAsyncHook(GoogleBaseHook): self.log.exception("Exception occurred while polling CMD result") raise AirflowException(ex) - result_dict = PollAirflowCommandResponse.to_dict(result) + try: + result_dict = PollAirflowCommandResponse.to_dict(result) + except Exception as ex: + self.log.exception("Exception occurred while transforming PollAirflowCommandResponse") + raise AirflowException(ex) + if result_dict["output_end"]: return result_dict diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py index 59704b3b4cb..8d5a13561d4 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py @@ -764,9 +764,15 @@ class CloudComposerRunAirflowCLICommandOperator(GoogleCloudBaseOperator): metadata=self.metadata, poll_interval=self.poll_interval, ) - result_str = self._merge_cmd_output_result(result) - self.log.info("Command execution result:\n%s", result_str) - return result + exit_code = result.get("exit_info", {}).get("exit_code") + if exit_code == 0: + result_str = self._merge_cmd_output_result(result) + self.log.info("Command execution result:\n%s", result_str) + return result + + error_output = "".join(line["content"] for line in result.get("error", [])) + message = f"Airflow CLI command failed with exit code {exit_code}.\nError output:\n{error_output}" + raise AirflowException(message) def execute_complete(self, context: Context, event: dict) -> dict: if event and event["status"] == "error": diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py index 8b10d2048c5..4382b114bd4 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_composer.py @@ -145,10 +145,23 @@ class CloudComposerAirflowCLICommandTrigger(BaseTrigger): ) return + exit_code = result.get("exit_info", {}).get("exit_code") + + if exit_code == 0: + yield TriggerEvent( + { + "status": "success", + "result": result, + } + ) + return + + error_output = "".join(line["content"] for line in result.get("error", [])) + message = f"Airflow CLI command failed with exit code {exit_code}.\nError output:\n{error_output}" yield TriggerEvent( { - "status": "success", - "result": result, + "status": "error", + "message": message, } ) return diff --git a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py index e882db9526a..e6da6cf5cf9 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py +++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py @@ -319,6 +319,12 @@ class TestCloudComposerRunAirflowCLICommandOperator: @mock.patch(COMPOSER_STRING.format("ExecuteAirflowCommandResponse.to_dict")) @mock.patch(COMPOSER_STRING.format("CloudComposerHook")) def test_execute(self, mock_hook, to_dict_mode) -> None: + mock_hook.return_value.wait_command_execution_result.return_value = { + "exit_info": {"exit_code": 0}, + "output": [ + {"content": "test"}, + ], + } op = CloudComposerRunAirflowCLICommandOperator( task_id=TASK_ID, project_id=TEST_GCP_PROJECT,