This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b086a22e80d fix(glue): Fix GlueJobOperator verbose logs not showing in
deferrable mode (#63086)
b086a22e80d is described below
commit b086a22e80dff25e82f2c80c54414a9b2f4008d5
Author: Shivam Rastogi <[email protected]>
AuthorDate: Fri Mar 27 16:37:26 2026 -0700
fix(glue): Fix GlueJobOperator verbose logs not showing in deferrable mode
(#63086)
* fix(glue): Add verbose CloudWatch log streaming for deferrable
GlueJobOperator
When using GlueJobOperator with deferrable=True and verbose=True, CloudWatch
logs were silently ignored because the trigger inherited the base waiter's
run() method which only polls job status. This adds a run() override and
_forward_logs() helper to the GlueJobCompleteTrigger that streams logs from
both output and error CloudWatch log groups, matching the format used by the
synchronous path
Extract get_glue_log_group_names() and format_glue_logs() as module-level
helpers in hooks/glue.py so that GlueJobHook.print_job_logs (sync) and
GlueJobCompleteTrigger._forward_logs (async) share identical log formatting
and log group name extraction logic.
closes: #56535
---------
Co-authored-by: Elad Kalif <[email protected]>
---
.../src/airflow/providers/amazon/aws/hooks/glue.py | 36 ++--
.../airflow/providers/amazon/aws/triggers/glue.py | 107 +++++++++++-
.../tests/unit/amazon/aws/triggers/test_glue.py | 193 +++++++++++++++++++++
3 files changed, 324 insertions(+), 12 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index cc7cfd2849c..0e58d605689 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -42,6 +42,28 @@ DEFAULT_LOG_SUFFIX = "output"
ERROR_LOG_SUFFIX = "error"
+def get_glue_log_group_names(job_run: dict[str, Any]) -> tuple[str, str]:
+ """Extract the output and error CloudWatch log group names from a Glue job
run response."""
+ log_group_prefix = job_run["LogGroupName"]
+ return (
+ f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}",
+ f"{log_group_prefix}/{ERROR_LOG_SUFFIX}",
+ )
+
+
+def format_glue_logs(fetched_logs: list[str], log_group: str) -> str:
+ """
+ Format fetched CloudWatch log messages for display.
+
+ Shared between ``GlueJobHook.print_job_logs`` and
``GlueJobCompleteTrigger._forward_logs``
+ so that both the sync and async paths produce identical output.
+ """
+ if fetched_logs:
+ messages = "\t".join(line.rstrip() + "\n" for line in fetched_logs)
+ return f"Glue Job Run {log_group} Logs:\n\t{messages}"
+ return f"No new log from the Glue Job in {log_group}"
+
+
class GlueJobHook(AwsBaseHook):
"""
Interact with AWS Glue.
@@ -350,22 +372,14 @@ class GlueJobHook(AwsBaseHook):
else:
raise
- if len(fetched_logs):
- # Add a tab to indent those logs and distinguish them from
airflow logs.
- # Log lines returned already contain a newline character at
the end.
- messages = "\t".join(fetched_logs)
- self.log.info("Glue Job Run %s Logs:\n\t%s", log_group,
messages)
- else:
- self.log.info("No new log from the Glue Job in %s", log_group)
+ self.log.info(format_glue_logs(fetched_logs, log_group))
return next_token
- log_group_prefix = job_run["LogGroupName"]
- log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
- log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
+ log_group_output, log_group_error = get_glue_log_group_names(job_run)
# one would think that the error log group would contain only errors,
but it actually contains
# a lot of interesting logs too, so it's valuable to have both
continuation_tokens.output_stream_continuation = display_logs_from(
- log_group_default, continuation_tokens.output_stream_continuation
+ log_group_output, continuation_tokens.output_stream_continuation
)
continuation_tokens.error_stream_continuation = display_logs_from(
log_group_error, continuation_tokens.error_stream_continuation
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
index f54f761825e..e9d2e4ade04 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
@@ -22,12 +22,21 @@ from collections.abc import AsyncIterator
from functools import cached_property
from typing import TYPE_CHECKING, Any
+from botocore.exceptions import ClientError
+
if TYPE_CHECKING:
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
-from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
+from airflow.providers.amazon.aws.hooks.glue import (
+ GlueDataQualityHook,
+ GlueJobHook,
+ format_glue_logs,
+ get_glue_log_group_names,
+)
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+from airflow.providers.common.compat.sdk import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -87,6 +96,102 @@ class GlueJobCompleteTrigger(AwsBaseWaiterTrigger):
config=self.botocore_config,
)
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ if not self.verbose:
+ async for event in super().run():
+ yield event
+ return
+
+ hook = self.hook()
+ async with (
+ await hook.get_async_conn() as glue_client,
+ await AwsLogsHook(
+ aws_conn_id=self.aws_conn_id, region_name=self.region_name
+ ).get_async_conn() as logs_client,
+ ):
+ # Get log group name from job run metadata and initial state in
one call
+ job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+ log_group_output, log_group_error =
get_glue_log_group_names(job_run_resp["JobRun"])
+
+ output_token: str | None = None
+ error_token: str | None = None
+ job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+ for _attempt in range(self.attempts):
+ # Fetch and print logs from both output and error streams
+ output_token = await self._forward_logs(
+ logs_client, log_group_output, self.run_id, output_token
+ )
+ error_token = await self._forward_logs(logs_client,
log_group_error, self.run_id, error_token)
+
+ if job_run_state in ("FAILED", "TIMEOUT"):
+ raise AirflowException(
+ f"Glue Job {self.job_name} Run {self.run_id} exited
with state: {job_run_state}"
+ )
+ if job_run_state in ("SUCCEEDED", "STOPPED"):
+ self.log.info(
+ "Exiting Job %s Run %s State: %s",
+ self.job_name,
+ self.run_id,
+ job_run_state,
+ )
+ yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
+ return
+
+ self.log.info(
+ "Polling for AWS Glue Job %s current run state: %s",
+ self.job_name,
+ job_run_state,
+ )
+ await asyncio.sleep(self.waiter_delay)
+
+ # Fetch updated state for next iteration
+ resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
+ job_run_state = resp["JobRun"]["JobRunState"]
+
+ raise AirflowException(
+ f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded
max attempts ({self.attempts})"
+ )
+
+ async def _forward_logs(
+ self,
+ logs_client: Any,
+ log_group: str,
+ log_stream: str,
+ next_token: str | None,
+ ) -> str | None:
+ # Matches the format used by the synchronous
GlueJobHook.print_job_logs.
+ fetched_logs: list[str] = []
+ while True:
+ token_arg: dict[str, str] = {"nextToken": next_token} if
next_token else {}
+ try:
+ response = await logs_client.get_log_events(
+ logGroupName=log_group,
+ logStreamName=log_stream,
+ startFromHead=True,
+ **token_arg,
+ )
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ResourceNotFoundException":
+ self.log.warning(
+ "No new Glue driver logs so far.\n"
+ "If this persists, check the CloudWatch dashboard at:
%r.",
+
f"https://{self.region_name}.console.aws.amazon.com/cloudwatch/home",
+ )
+ return None
+ raise
+
+ events = response["events"]
+ fetched_logs.extend(event["message"] for event in events)
+
+ if not events or next_token == response["nextForwardToken"]:
+ break
+ next_token = response["nextForwardToken"]
+
+ self.log.info(format_glue_logs(fetched_logs, log_group))
+
+ return response.get("nextForwardToken")
+
class GlueCatalogPartitionTrigger(BaseTrigger):
"""
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
index 4339d36ce38..b40dcd2188a 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
@@ -21,9 +21,11 @@ from unittest import mock
from unittest.mock import AsyncMock
import pytest
+from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.glue import (
GlueCatalogPartitionTrigger,
GlueDataQualityRuleRecommendationRunCompleteTrigger,
@@ -111,6 +113,197 @@ class TestGlueJobTrigger:
"waiter_delay": 10,
}
+ def test_serialization_verbose(self):
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="JobRunId",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_max_attempts=3,
+ waiter_delay=10,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert kwargs["verbose"] is True
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
+ """When verbose=True, the trigger polls job state and fetches
CloudWatch logs."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ side_effect=[
+ # First call: metadata + initial state (RUNNING)
+ {"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}},
+ # Second call: state update after sleep (SUCCEEDED)
+ {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName":
"/aws-glue/python-jobs"}},
+ ]
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ return_value={
+ "events": [{"timestamp": 1234, "message": "Processing step
1\n"}],
+ "nextForwardToken": "token_1",
+ }
+ )
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+
+ assert event.payload["status"] == "success"
+ assert event.payload["run_id"] == "jr_123"
+ # Logs client was called for both output and error streams
+ assert logs_client.get_log_events.call_count >= 2
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_job_failed(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and the job fails, the trigger raises
AirflowException."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ with pytest.raises(AirflowException, match="FAILED"):
+ async for _ in trigger.run():
+ pass
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_max_attempts(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and the job stays RUNNING past max attempts,
raises AirflowException."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=2,
+ )
+ with pytest.raises(AirflowException, match="max attempts"):
+ async for _ in trigger.run():
+ pass
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_resource_not_found(self):
+ """_forward_logs handles ResourceNotFoundException gracefully."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=ClientError(
+ {"Error": {"Code": "ResourceNotFoundException", "Message":
"not found"}},
+ "GetLogEvents",
+ )
+ )
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ region_name="us-east-1",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_pagination(self, caplog):
+ """_forward_logs follows nextForwardToken and formats logs like the
sync path."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=[
+ {
+ "events": [{"timestamp": 1, "message": "line 1\n"}],
+ "nextForwardToken": "token_2",
+ },
+ {
+ "events": [{"timestamp": 2, "message": "line 2\n"}],
+ "nextForwardToken": "token_3",
+ },
+ {
+ "events": [],
+ "nextForwardToken": "token_3",
+ },
+ ]
+ )
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result == "token_3"
+ assert logs_client.get_log_events.call_count == 3
+ # Verify log format matches sync path: "Glue Job Run <log_group>
Logs:" with tab-indented lines
+ assert "Glue Job Run /aws-glue/python-jobs/output Logs:" in caplog.text
+ assert "\tline 1" in caplog.text
+ assert "\tline 2" in caplog.text
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_no_new_events(self, caplog):
+ """_forward_logs logs 'No new log' when there are no events, matching
sync path."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result == "token_1"
+ assert "No new log from the Glue Job in /aws-glue/python-jobs/output"
in caplog.text
+
class TestGlueCatalogPartitionSensorTrigger:
@pytest.mark.asyncio