This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 48685e7715d Revert "fix(glue): Fix GlueJobOperator verbose logs not 
showing in deferrable mode (#63086)" (#64340)
48685e7715d is described below

commit 48685e7715d02c643d73e39a3b1245b5fa7eb519
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Mar 27 19:06:21 2026 -0700

    Revert "fix(glue): Fix GlueJobOperator verbose logs not showing in 
deferrable mode (#63086)" (#64340)
    
    This reverts commit b086a22e80dff25e82f2c80c54414a9b2f4008d5.
---
 .../src/airflow/providers/amazon/aws/hooks/glue.py |  36 ++--
 .../airflow/providers/amazon/aws/triggers/glue.py  | 107 +-----------
 .../tests/unit/amazon/aws/triggers/test_glue.py    | 193 ---------------------
 3 files changed, 12 insertions(+), 324 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index 0e58d605689..cc7cfd2849c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -42,28 +42,6 @@ DEFAULT_LOG_SUFFIX = "output"
 ERROR_LOG_SUFFIX = "error"
 
 
-def get_glue_log_group_names(job_run: dict[str, Any]) -> tuple[str, str]:
-    """Extract the output and error CloudWatch log group names from a Glue job 
run response."""
-    log_group_prefix = job_run["LogGroupName"]
-    return (
-        f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}",
-        f"{log_group_prefix}/{ERROR_LOG_SUFFIX}",
-    )
-
-
-def format_glue_logs(fetched_logs: list[str], log_group: str) -> str:
-    """
-    Format fetched CloudWatch log messages for display.
-
-    Shared between ``GlueJobHook.print_job_logs`` and 
``GlueJobCompleteTrigger._forward_logs``
-    so that both the sync and async paths produce identical output.
-    """
-    if fetched_logs:
-        messages = "\t".join(line.rstrip() + "\n" for line in fetched_logs)
-        return f"Glue Job Run {log_group} Logs:\n\t{messages}"
-    return f"No new log from the Glue Job in {log_group}"
-
-
 class GlueJobHook(AwsBaseHook):
     """
     Interact with AWS Glue.
@@ -372,14 +350,22 @@ class GlueJobHook(AwsBaseHook):
                 else:
                     raise
 
-            self.log.info(format_glue_logs(fetched_logs, log_group))
+            if len(fetched_logs):
+                # Add a tab to indent those logs and distinguish them from 
airflow logs.
+                # Log lines returned already contain a newline character at 
the end.
+                messages = "\t".join(fetched_logs)
+                self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, 
messages)
+            else:
+                self.log.info("No new log from the Glue Job in %s", log_group)
             return next_token
 
-        log_group_output, log_group_error = get_glue_log_group_names(job_run)
+        log_group_prefix = job_run["LogGroupName"]
+        log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+        log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
         # one would think that the error log group would contain only errors, 
but it actually contains
         # a lot of interesting logs too, so it's valuable to have both
         continuation_tokens.output_stream_continuation = display_logs_from(
-            log_group_output, continuation_tokens.output_stream_continuation
+            log_group_default, continuation_tokens.output_stream_continuation
         )
         continuation_tokens.error_stream_continuation = display_logs_from(
             log_group_error, continuation_tokens.error_stream_continuation
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
index e9d2e4ade04..f54f761825e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
@@ -22,21 +22,12 @@ from collections.abc import AsyncIterator
 from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
-from botocore.exceptions import ClientError
-
 if TYPE_CHECKING:
     from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
 
-from airflow.providers.amazon.aws.hooks.glue import (
-    GlueDataQualityHook,
-    GlueJobHook,
-    format_glue_logs,
-    get_glue_log_group_names,
-)
+from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook, 
GlueJobHook
 from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
-from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
 from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
-from airflow.providers.common.compat.sdk import AirflowException
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 
@@ -96,102 +87,6 @@ class GlueJobCompleteTrigger(AwsBaseWaiterTrigger):
             config=self.botocore_config,
         )
 
-    async def run(self) -> AsyncIterator[TriggerEvent]:
-        if not self.verbose:
-            async for event in super().run():
-                yield event
-            return
-
-        hook = self.hook()
-        async with (
-            await hook.get_async_conn() as glue_client,
-            await AwsLogsHook(
-                aws_conn_id=self.aws_conn_id, region_name=self.region_name
-            ).get_async_conn() as logs_client,
-        ):
-            # Get log group name from job run metadata and initial state in 
one call
-            job_run_resp = await 
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
-            log_group_output, log_group_error = 
get_glue_log_group_names(job_run_resp["JobRun"])
-
-            output_token: str | None = None
-            error_token: str | None = None
-            job_run_state = job_run_resp["JobRun"]["JobRunState"]
-
-            for _attempt in range(self.attempts):
-                # Fetch and print logs from both output and error streams
-                output_token = await self._forward_logs(
-                    logs_client, log_group_output, self.run_id, output_token
-                )
-                error_token = await self._forward_logs(logs_client, 
log_group_error, self.run_id, error_token)
-
-                if job_run_state in ("FAILED", "TIMEOUT"):
-                    raise AirflowException(
-                        f"Glue Job {self.job_name} Run {self.run_id} exited 
with state: {job_run_state}"
-                    )
-                if job_run_state in ("SUCCEEDED", "STOPPED"):
-                    self.log.info(
-                        "Exiting Job %s Run %s State: %s",
-                        self.job_name,
-                        self.run_id,
-                        job_run_state,
-                    )
-                    yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
-                    return
-
-                self.log.info(
-                    "Polling for AWS Glue Job %s current run state: %s",
-                    self.job_name,
-                    job_run_state,
-                )
-                await asyncio.sleep(self.waiter_delay)
-
-                # Fetch updated state for next iteration
-                resp = await glue_client.get_job_run(JobName=self.job_name, 
RunId=self.run_id)
-                job_run_state = resp["JobRun"]["JobRunState"]
-
-            raise AirflowException(
-                f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded 
max attempts ({self.attempts})"
-            )
-
-    async def _forward_logs(
-        self,
-        logs_client: Any,
-        log_group: str,
-        log_stream: str,
-        next_token: str | None,
-    ) -> str | None:
-        # Matches the format used by the synchronous 
GlueJobHook.print_job_logs.
-        fetched_logs: list[str] = []
-        while True:
-            token_arg: dict[str, str] = {"nextToken": next_token} if 
next_token else {}
-            try:
-                response = await logs_client.get_log_events(
-                    logGroupName=log_group,
-                    logStreamName=log_stream,
-                    startFromHead=True,
-                    **token_arg,
-                )
-            except ClientError as e:
-                if e.response["Error"]["Code"] == "ResourceNotFoundException":
-                    self.log.warning(
-                        "No new Glue driver logs so far.\n"
-                        "If this persists, check the CloudWatch dashboard at: 
%r.",
-                        
f"https://{self.region_name}.console.aws.amazon.com/cloudwatch/home";,
-                    )
-                    return None
-                raise
-
-            events = response["events"]
-            fetched_logs.extend(event["message"] for event in events)
-
-            if not events or next_token == response["nextForwardToken"]:
-                break
-            next_token = response["nextForwardToken"]
-
-        self.log.info(format_glue_logs(fetched_logs, log_group))
-
-        return response.get("nextForwardToken")
-
 
 class GlueCatalogPartitionTrigger(BaseTrigger):
     """
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py 
b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
index b40dcd2188a..4339d36ce38 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
@@ -21,11 +21,9 @@ from unittest import mock
 from unittest.mock import AsyncMock
 
 import pytest
-from botocore.exceptions import ClientError
 
 from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook, 
GlueJobHook
 from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
-from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
 from airflow.providers.amazon.aws.triggers.glue import (
     GlueCatalogPartitionTrigger,
     GlueDataQualityRuleRecommendationRunCompleteTrigger,
@@ -113,197 +111,6 @@ class TestGlueJobTrigger:
             "waiter_delay": 10,
         }
 
-    def test_serialization_verbose(self):
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="JobRunId",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            waiter_max_attempts=3,
-            waiter_delay=10,
-        )
-        classpath, kwargs = trigger.serialize()
-        assert kwargs["verbose"] is True
-
-    @pytest.mark.asyncio
-    @mock.patch.object(AwsLogsHook, "get_async_conn")
-    @mock.patch.object(GlueJobHook, "get_async_conn")
-    async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
-        """When verbose=True, the trigger polls job state and fetches 
CloudWatch logs."""
-        glue_client = AsyncMock()
-        glue_client.get_job_run = AsyncMock(
-            side_effect=[
-                # First call: metadata + initial state (RUNNING)
-                {"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}},
-                # Second call: state update after sleep (SUCCEEDED)
-                {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName": 
"/aws-glue/python-jobs"}},
-            ]
-        )
-        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
-        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
-        logs_client = AsyncMock()
-        logs_client.get_log_events = AsyncMock(
-            return_value={
-                "events": [{"timestamp": 1234, "message": "Processing step 
1\n"}],
-                "nextForwardToken": "token_1",
-            }
-        )
-        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
-        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="jr_123",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            waiter_delay=0,
-            waiter_max_attempts=5,
-        )
-        generator = trigger.run()
-        event = await generator.asend(None)
-
-        assert event.payload["status"] == "success"
-        assert event.payload["run_id"] == "jr_123"
-        # Logs client was called for both output and error streams
-        assert logs_client.get_log_events.call_count >= 2
-
-    @pytest.mark.asyncio
-    @mock.patch.object(AwsLogsHook, "get_async_conn")
-    @mock.patch.object(GlueJobHook, "get_async_conn")
-    async def test_verbose_run_job_failed(self, mock_glue_conn, 
mock_logs_conn):
-        """When verbose=True and the job fails, the trigger raises 
AirflowException."""
-        glue_client = AsyncMock()
-        glue_client.get_job_run = AsyncMock(
-            return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName": 
"/aws-glue/python-jobs"}}
-        )
-        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
-        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
-        logs_client = AsyncMock()
-        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
-        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
-        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="jr_123",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            waiter_delay=0,
-            waiter_max_attempts=5,
-        )
-        with pytest.raises(AirflowException, match="FAILED"):
-            async for _ in trigger.run():
-                pass
-
-    @pytest.mark.asyncio
-    @mock.patch.object(AwsLogsHook, "get_async_conn")
-    @mock.patch.object(GlueJobHook, "get_async_conn")
-    async def test_verbose_run_max_attempts(self, mock_glue_conn, 
mock_logs_conn):
-        """When verbose=True and the job stays RUNNING past max attempts, 
raises AirflowException."""
-        glue_client = AsyncMock()
-        glue_client.get_job_run = AsyncMock(
-            return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}}
-        )
-        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
-        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
-        logs_client = AsyncMock()
-        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
-        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
-        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="jr_123",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            waiter_delay=0,
-            waiter_max_attempts=2,
-        )
-        with pytest.raises(AirflowException, match="max attempts"):
-            async for _ in trigger.run():
-                pass
-
-    @pytest.mark.asyncio
-    async def test_forward_logs_resource_not_found(self):
-        """_forward_logs handles ResourceNotFoundException gracefully."""
-        logs_client = AsyncMock()
-        logs_client.get_log_events = AsyncMock(
-            side_effect=ClientError(
-                {"Error": {"Code": "ResourceNotFoundException", "Message": 
"not found"}},
-                "GetLogEvents",
-            )
-        )
-
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="jr_123",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            region_name="us-east-1",
-            waiter_delay=0,
-            waiter_max_attempts=5,
-        )
-        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
-        assert result is None
-
-    @pytest.mark.asyncio
-    async def test_forward_logs_pagination(self, caplog):
-        """_forward_logs follows nextForwardToken and formats logs like the 
sync path."""
-        logs_client = AsyncMock()
-        logs_client.get_log_events = AsyncMock(
-            side_effect=[
-                {
-                    "events": [{"timestamp": 1, "message": "line 1\n"}],
-                    "nextForwardToken": "token_2",
-                },
-                {
-                    "events": [{"timestamp": 2, "message": "line 2\n"}],
-                    "nextForwardToken": "token_3",
-                },
-                {
-                    "events": [],
-                    "nextForwardToken": "token_3",
-                },
-            ]
-        )
-
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="jr_123",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            waiter_delay=0,
-            waiter_max_attempts=5,
-        )
-        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
-        assert result == "token_3"
-        assert logs_client.get_log_events.call_count == 3
-        # Verify log format matches sync path: "Glue Job Run <log_group> 
Logs:" with tab-indented lines
-        assert "Glue Job Run /aws-glue/python-jobs/output Logs:" in caplog.text
-        assert "\tline 1" in caplog.text
-        assert "\tline 2" in caplog.text
-
-    @pytest.mark.asyncio
-    async def test_forward_logs_no_new_events(self, caplog):
-        """_forward_logs logs 'No new log' when there are no events, matching 
sync path."""
-        logs_client = AsyncMock()
-        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
-
-        trigger = GlueJobCompleteTrigger(
-            job_name="job_name",
-            run_id="jr_123",
-            verbose=True,
-            aws_conn_id="aws_conn_id",
-            waiter_delay=0,
-            waiter_max_attempts=5,
-        )
-        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
-        assert result == "token_1"
-        assert "No new log from the Glue Job in /aws-glue/python-jobs/output" 
in caplog.text
-
 
 class TestGlueCatalogPartitionSensorTrigger:
     @pytest.mark.asyncio

Reply via email to