This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b086a22e80d fix(glue): Fix GlueJobOperator verbose logs not showing in 
deferrable mode (#63086)
b086a22e80d is described below

commit b086a22e80dff25e82f2c80c54414a9b2f4008d5
Author: Shivam Rastogi <[email protected]>
AuthorDate: Fri Mar 27 16:37:26 2026 -0700

    fix(glue): Fix GlueJobOperator verbose logs not showing in deferrable mode 
(#63086)
    
    * fix(glue): Add verbose CloudWatch log streaming for deferrable 
GlueJobOperator
    
    When using GlueJobOperator with deferrable=True and verbose=True, CloudWatch
    logs were silently ignored because the trigger inherited the base waiter's
    run() method which only polls job status. This adds a run() override and
    _forward_logs() helper to the GlueJobCompleteTrigger that streams logs from
    both output and error CloudWatch log groups, matching the format used by the
    synchronous path
    
    Extract get_glue_log_group_names() and format_glue_logs() as module-level
    helpers in hooks/glue.py so that GlueJobHook.print_job_logs (sync) and
    GlueJobCompleteTrigger._forward_logs (async) share identical log formatting
    and log group name extraction logic.
    
    closes: #56535
    
    ---------
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 .../src/airflow/providers/amazon/aws/hooks/glue.py |  36 ++--
 .../airflow/providers/amazon/aws/triggers/glue.py  | 107 +++++++++++-
 .../tests/unit/amazon/aws/triggers/test_glue.py    | 193 +++++++++++++++++++++
 3 files changed, 324 insertions(+), 12 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index cc7cfd2849c..0e58d605689 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -42,6 +42,28 @@ DEFAULT_LOG_SUFFIX = "output"
 ERROR_LOG_SUFFIX = "error"
 
 
+def get_glue_log_group_names(job_run: dict[str, Any]) -> tuple[str, str]:
+    """Extract the output and error CloudWatch log group names from a Glue job 
run response."""
+    log_group_prefix = job_run["LogGroupName"]
+    return (
+        f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}",
+        f"{log_group_prefix}/{ERROR_LOG_SUFFIX}",
+    )
+
+
+def format_glue_logs(fetched_logs: list[str], log_group: str) -> str:
+    """
+    Format fetched CloudWatch log messages for display.
+
+    Shared between ``GlueJobHook.print_job_logs`` and 
``GlueJobCompleteTrigger._forward_logs``
+    so that both the sync and async paths produce identical output.
+    """
+    if fetched_logs:
+        messages = "\t".join(line.rstrip() + "\n" for line in fetched_logs)
+        return f"Glue Job Run {log_group} Logs:\n\t{messages}"
+    return f"No new log from the Glue Job in {log_group}"
+
+
 class GlueJobHook(AwsBaseHook):
     """
     Interact with AWS Glue.
@@ -350,22 +372,14 @@ class GlueJobHook(AwsBaseHook):
                 else:
                     raise
 
-            if len(fetched_logs):
-                # Add a tab to indent those logs and distinguish them from 
airflow logs.
-                # Log lines returned already contain a newline character at 
the end.
-                messages = "\t".join(fetched_logs)
-                self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, 
messages)
-            else:
-                self.log.info("No new log from the Glue Job in %s", log_group)
+            self.log.info(format_glue_logs(fetched_logs, log_group))
             return next_token
 
-        log_group_prefix = job_run["LogGroupName"]
-        log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
-        log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+        log_group_output, log_group_error = get_glue_log_group_names(job_run)
         # one would think that the error log group would contain only errors, 
but it actually contains
         # a lot of interesting logs too, so it's valuable to have both
         continuation_tokens.output_stream_continuation = display_logs_from(
-            log_group_default, continuation_tokens.output_stream_continuation
+            log_group_output, continuation_tokens.output_stream_continuation
         )
         continuation_tokens.error_stream_continuation = display_logs_from(
             log_group_error, continuation_tokens.error_stream_continuation
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
index f54f761825e..e9d2e4ade04 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
@@ -22,12 +22,21 @@ from collections.abc import AsyncIterator
 from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
+from botocore.exceptions import ClientError
+
 if TYPE_CHECKING:
     from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
 
-from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook, 
GlueJobHook
+from airflow.providers.amazon.aws.hooks.glue import (
+    GlueDataQualityHook,
+    GlueJobHook,
+    format_glue_logs,
+    get_glue_log_group_names,
+)
 from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
 from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+from airflow.providers.common.compat.sdk import AirflowException
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 
@@ -87,6 +96,102 @@ class GlueJobCompleteTrigger(AwsBaseWaiterTrigger):
             config=self.botocore_config,
         )
 
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        if not self.verbose:
+            async for event in super().run():
+                yield event
+            return
+
+        hook = self.hook()
+        async with (
+            await hook.get_async_conn() as glue_client,
+            await AwsLogsHook(
+                aws_conn_id=self.aws_conn_id, region_name=self.region_name
+            ).get_async_conn() as logs_client,
+        ):
+            # Get log group name from job run metadata and initial state in 
one call
+            job_run_resp = await 
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+            log_group_output, log_group_error = 
get_glue_log_group_names(job_run_resp["JobRun"])
+
+            output_token: str | None = None
+            error_token: str | None = None
+            job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+            for _attempt in range(self.attempts):
+                # Fetch and print logs from both output and error streams
+                output_token = await self._forward_logs(
+                    logs_client, log_group_output, self.run_id, output_token
+                )
+                error_token = await self._forward_logs(logs_client, 
log_group_error, self.run_id, error_token)
+
+                if job_run_state in ("FAILED", "TIMEOUT"):
+                    raise AirflowException(
+                        f"Glue Job {self.job_name} Run {self.run_id} exited 
with state: {job_run_state}"
+                    )
+                if job_run_state in ("SUCCEEDED", "STOPPED"):
+                    self.log.info(
+                        "Exiting Job %s Run %s State: %s",
+                        self.job_name,
+                        self.run_id,
+                        job_run_state,
+                    )
+                    yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
+                    return
+
+                self.log.info(
+                    "Polling for AWS Glue Job %s current run state: %s",
+                    self.job_name,
+                    job_run_state,
+                )
+                await asyncio.sleep(self.waiter_delay)
+
+                # Fetch updated state for next iteration
+                resp = await glue_client.get_job_run(JobName=self.job_name, 
RunId=self.run_id)
+                job_run_state = resp["JobRun"]["JobRunState"]
+
+            raise AirflowException(
+                f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded 
max attempts ({self.attempts})"
+            )
+
+    async def _forward_logs(
+        self,
+        logs_client: Any,
+        log_group: str,
+        log_stream: str,
+        next_token: str | None,
+    ) -> str | None:
+        # Matches the format used by the synchronous 
GlueJobHook.print_job_logs.
+        fetched_logs: list[str] = []
+        while True:
+            token_arg: dict[str, str] = {"nextToken": next_token} if 
next_token else {}
+            try:
+                response = await logs_client.get_log_events(
+                    logGroupName=log_group,
+                    logStreamName=log_stream,
+                    startFromHead=True,
+                    **token_arg,
+                )
+            except ClientError as e:
+                if e.response["Error"]["Code"] == "ResourceNotFoundException":
+                    self.log.warning(
+                        "No new Glue driver logs so far.\n"
+                        "If this persists, check the CloudWatch dashboard at: 
%r.",
+                        
f"https://{self.region_name}.console.aws.amazon.com/cloudwatch/home";,
+                    )
+                    return None
+                raise
+
+            events = response["events"]
+            fetched_logs.extend(event["message"] for event in events)
+
+            if not events or next_token == response["nextForwardToken"]:
+                break
+            next_token = response["nextForwardToken"]
+
+        self.log.info(format_glue_logs(fetched_logs, log_group))
+
+        return response.get("nextForwardToken")
+
 
 class GlueCatalogPartitionTrigger(BaseTrigger):
     """
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py 
b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
index 4339d36ce38..b40dcd2188a 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
@@ -21,9 +21,11 @@ from unittest import mock
 from unittest.mock import AsyncMock
 
 import pytest
+from botocore.exceptions import ClientError
 
 from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook, 
GlueJobHook
 from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
 from airflow.providers.amazon.aws.triggers.glue import (
     GlueCatalogPartitionTrigger,
     GlueDataQualityRuleRecommendationRunCompleteTrigger,
@@ -111,6 +113,197 @@ class TestGlueJobTrigger:
             "waiter_delay": 10,
         }
 
+    def test_serialization_verbose(self):
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="JobRunId",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_max_attempts=3,
+            waiter_delay=10,
+        )
+        classpath, kwargs = trigger.serialize()
+        assert kwargs["verbose"] is True
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
+        """When verbose=True, the trigger polls job state and fetches 
CloudWatch logs."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            side_effect=[
+                # First call: metadata + initial state (RUNNING)
+                {"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}},
+                # Second call: state update after sleep (SUCCEEDED)
+                {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName": 
"/aws-glue/python-jobs"}},
+            ]
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(
+            return_value={
+                "events": [{"timestamp": 1234, "message": "Processing step 
1\n"}],
+                "nextForwardToken": "token_1",
+            }
+        )
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        generator = trigger.run()
+        event = await generator.asend(None)
+
+        assert event.payload["status"] == "success"
+        assert event.payload["run_id"] == "jr_123"
+        # Logs client was called for both output and error streams
+        assert logs_client.get_log_events.call_count >= 2
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_job_failed(self, mock_glue_conn, 
mock_logs_conn):
+        """When verbose=True and the job fails, the trigger raises 
AirflowException."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName": 
"/aws-glue/python-jobs"}}
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        with pytest.raises(AirflowException, match="FAILED"):
+            async for _ in trigger.run():
+                pass
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_max_attempts(self, mock_glue_conn, 
mock_logs_conn):
+        """When verbose=True and the job stays RUNNING past max attempts, 
raises AirflowException."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}}
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=2,
+        )
+        with pytest.raises(AirflowException, match="max attempts"):
+            async for _ in trigger.run():
+                pass
+
+    @pytest.mark.asyncio
+    async def test_forward_logs_resource_not_found(self):
+        """_forward_logs handles ResourceNotFoundException gracefully."""
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(
+            side_effect=ClientError(
+                {"Error": {"Code": "ResourceNotFoundException", "Message": 
"not found"}},
+                "GetLogEvents",
+            )
+        )
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            region_name="us-east-1",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
+        assert result is None
+
+    @pytest.mark.asyncio
+    async def test_forward_logs_pagination(self, caplog):
+        """_forward_logs follows nextForwardToken and formats logs like the 
sync path."""
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(
+            side_effect=[
+                {
+                    "events": [{"timestamp": 1, "message": "line 1\n"}],
+                    "nextForwardToken": "token_2",
+                },
+                {
+                    "events": [{"timestamp": 2, "message": "line 2\n"}],
+                    "nextForwardToken": "token_3",
+                },
+                {
+                    "events": [],
+                    "nextForwardToken": "token_3",
+                },
+            ]
+        )
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
+        assert result == "token_3"
+        assert logs_client.get_log_events.call_count == 3
+        # Verify log format matches sync path: "Glue Job Run <log_group> 
Logs:" with tab-indented lines
+        assert "Glue Job Run /aws-glue/python-jobs/output Logs:" in caplog.text
+        assert "\tline 1" in caplog.text
+        assert "\tline 2" in caplog.text
+
+    @pytest.mark.asyncio
+    async def test_forward_logs_no_new_events(self, caplog):
+        """_forward_logs logs 'No new log' when there are no events, matching 
sync path."""
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
+        assert result == "token_1"
+        assert "No new log from the Glue Job in /aws-glue/python-jobs/output" 
in caplog.text
+
 
 class TestGlueCatalogPartitionSensorTrigger:
     @pytest.mark.asyncio

Reply via email to