This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 48685e7715d Revert "fix(glue): Fix GlueJobOperator verbose logs not
showing in deferrable mode (#63086)" (#64340)
48685e7715d is described below
commit 48685e7715d02c643d73e39a3b1245b5fa7eb519
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Mar 27 19:06:21 2026 -0700
Revert "fix(glue): Fix GlueJobOperator verbose logs not showing in
deferrable mode (#63086)" (#64340)
This reverts commit b086a22e80dff25e82f2c80c54414a9b2f4008d5.
---
.../src/airflow/providers/amazon/aws/hooks/glue.py | 36 ++--
.../airflow/providers/amazon/aws/triggers/glue.py | 107 +-----------
.../tests/unit/amazon/aws/triggers/test_glue.py | 193 ---------------------
3 files changed, 12 insertions(+), 324 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index 0e58d605689..cc7cfd2849c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -42,28 +42,6 @@ DEFAULT_LOG_SUFFIX = "output"
ERROR_LOG_SUFFIX = "error"
-def get_glue_log_group_names(job_run: dict[str, Any]) -> tuple[str, str]:
- """Extract the output and error CloudWatch log group names from a Glue job
run response."""
- log_group_prefix = job_run["LogGroupName"]
- return (
- f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}",
- f"{log_group_prefix}/{ERROR_LOG_SUFFIX}",
- )
-
-
-def format_glue_logs(fetched_logs: list[str], log_group: str) -> str:
- """
- Format fetched CloudWatch log messages for display.
-
- Shared between ``GlueJobHook.print_job_logs`` and
``GlueJobCompleteTrigger._forward_logs``
- so that both the sync and async paths produce identical output.
- """
- if fetched_logs:
- messages = "\t".join(line.rstrip() + "\n" for line in fetched_logs)
- return f"Glue Job Run {log_group} Logs:\n\t{messages}"
- return f"No new log from the Glue Job in {log_group}"
-
-
class GlueJobHook(AwsBaseHook):
"""
Interact with AWS Glue.
@@ -372,14 +350,22 @@ class GlueJobHook(AwsBaseHook):
else:
raise
- self.log.info(format_glue_logs(fetched_logs, log_group))
+ if len(fetched_logs):
+ # Add a tab to indent those logs and distinguish them from
airflow logs.
+ # Log lines returned already contain a newline character at
the end.
+ messages = "\t".join(fetched_logs)
+ self.log.info("Glue Job Run %s Logs:\n\t%s", log_group,
messages)
+ else:
+ self.log.info("No new log from the Glue Job in %s", log_group)
return next_token
- log_group_output, log_group_error = get_glue_log_group_names(job_run)
+ log_group_prefix = job_run["LogGroupName"]
+ log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
+ log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
# one would think that the error log group would contain only errors,
but it actually contains
# a lot of interesting logs too, so it's valuable to have both
continuation_tokens.output_stream_continuation = display_logs_from(
- log_group_output, continuation_tokens.output_stream_continuation
+ log_group_default, continuation_tokens.output_stream_continuation
)
continuation_tokens.error_stream_continuation = display_logs_from(
log_group_error, continuation_tokens.error_stream_continuation
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
index e9d2e4ade04..f54f761825e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py
@@ -22,21 +22,12 @@ from collections.abc import AsyncIterator
from functools import cached_property
from typing import TYPE_CHECKING, Any
-from botocore.exceptions import ClientError
-
if TYPE_CHECKING:
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
-from airflow.providers.amazon.aws.hooks.glue import (
- GlueDataQualityHook,
- GlueJobHook,
- format_glue_logs,
- get_glue_log_group_names,
-)
+from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
-from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
-from airflow.providers.common.compat.sdk import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -96,102 +87,6 @@ class GlueJobCompleteTrigger(AwsBaseWaiterTrigger):
config=self.botocore_config,
)
- async def run(self) -> AsyncIterator[TriggerEvent]:
- if not self.verbose:
- async for event in super().run():
- yield event
- return
-
- hook = self.hook()
- async with (
- await hook.get_async_conn() as glue_client,
- await AwsLogsHook(
- aws_conn_id=self.aws_conn_id, region_name=self.region_name
- ).get_async_conn() as logs_client,
- ):
- # Get log group name from job run metadata and initial state in
one call
- job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
- log_group_output, log_group_error =
get_glue_log_group_names(job_run_resp["JobRun"])
-
- output_token: str | None = None
- error_token: str | None = None
- job_run_state = job_run_resp["JobRun"]["JobRunState"]
-
- for _attempt in range(self.attempts):
- # Fetch and print logs from both output and error streams
- output_token = await self._forward_logs(
- logs_client, log_group_output, self.run_id, output_token
- )
- error_token = await self._forward_logs(logs_client,
log_group_error, self.run_id, error_token)
-
- if job_run_state in ("FAILED", "TIMEOUT"):
- raise AirflowException(
- f"Glue Job {self.job_name} Run {self.run_id} exited
with state: {job_run_state}"
- )
- if job_run_state in ("SUCCEEDED", "STOPPED"):
- self.log.info(
- "Exiting Job %s Run %s State: %s",
- self.job_name,
- self.run_id,
- job_run_state,
- )
- yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
- return
-
- self.log.info(
- "Polling for AWS Glue Job %s current run state: %s",
- self.job_name,
- job_run_state,
- )
- await asyncio.sleep(self.waiter_delay)
-
- # Fetch updated state for next iteration
- resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
- job_run_state = resp["JobRun"]["JobRunState"]
-
- raise AirflowException(
- f"Glue Job {self.job_name} Run {self.run_id} waiter exceeded
max attempts ({self.attempts})"
- )
-
- async def _forward_logs(
- self,
- logs_client: Any,
- log_group: str,
- log_stream: str,
- next_token: str | None,
- ) -> str | None:
- # Matches the format used by the synchronous
GlueJobHook.print_job_logs.
- fetched_logs: list[str] = []
- while True:
- token_arg: dict[str, str] = {"nextToken": next_token} if
next_token else {}
- try:
- response = await logs_client.get_log_events(
- logGroupName=log_group,
- logStreamName=log_stream,
- startFromHead=True,
- **token_arg,
- )
- except ClientError as e:
- if e.response["Error"]["Code"] == "ResourceNotFoundException":
- self.log.warning(
- "No new Glue driver logs so far.\n"
- "If this persists, check the CloudWatch dashboard at:
%r.",
-
f"https://{self.region_name}.console.aws.amazon.com/cloudwatch/home",
- )
- return None
- raise
-
- events = response["events"]
- fetched_logs.extend(event["message"] for event in events)
-
- if not events or next_token == response["nextForwardToken"]:
- break
- next_token = response["nextForwardToken"]
-
- self.log.info(format_glue_logs(fetched_logs, log_group))
-
- return response.get("nextForwardToken")
-
class GlueCatalogPartitionTrigger(BaseTrigger):
"""
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
index b40dcd2188a..4339d36ce38 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py
@@ -21,11 +21,9 @@ from unittest import mock
from unittest.mock import AsyncMock
import pytest
-from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook,
GlueJobHook
from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
-from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.glue import (
GlueCatalogPartitionTrigger,
GlueDataQualityRuleRecommendationRunCompleteTrigger,
@@ -113,197 +111,6 @@ class TestGlueJobTrigger:
"waiter_delay": 10,
}
- def test_serialization_verbose(self):
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="JobRunId",
- verbose=True,
- aws_conn_id="aws_conn_id",
- waiter_max_attempts=3,
- waiter_delay=10,
- )
- classpath, kwargs = trigger.serialize()
- assert kwargs["verbose"] is True
-
- @pytest.mark.asyncio
- @mock.patch.object(AwsLogsHook, "get_async_conn")
- @mock.patch.object(GlueJobHook, "get_async_conn")
- async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
- """When verbose=True, the trigger polls job state and fetches
CloudWatch logs."""
- glue_client = AsyncMock()
- glue_client.get_job_run = AsyncMock(
- side_effect=[
- # First call: metadata + initial state (RUNNING)
- {"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}},
- # Second call: state update after sleep (SUCCEEDED)
- {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName":
"/aws-glue/python-jobs"}},
- ]
- )
- mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
- mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
- logs_client = AsyncMock()
- logs_client.get_log_events = AsyncMock(
- return_value={
- "events": [{"timestamp": 1234, "message": "Processing step
1\n"}],
- "nextForwardToken": "token_1",
- }
- )
- mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
- mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="jr_123",
- verbose=True,
- aws_conn_id="aws_conn_id",
- waiter_delay=0,
- waiter_max_attempts=5,
- )
- generator = trigger.run()
- event = await generator.asend(None)
-
- assert event.payload["status"] == "success"
- assert event.payload["run_id"] == "jr_123"
- # Logs client was called for both output and error streams
- assert logs_client.get_log_events.call_count >= 2
-
- @pytest.mark.asyncio
- @mock.patch.object(AwsLogsHook, "get_async_conn")
- @mock.patch.object(GlueJobHook, "get_async_conn")
- async def test_verbose_run_job_failed(self, mock_glue_conn,
mock_logs_conn):
- """When verbose=True and the job fails, the trigger raises
AirflowException."""
- glue_client = AsyncMock()
- glue_client.get_job_run = AsyncMock(
- return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName":
"/aws-glue/python-jobs"}}
- )
- mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
- mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
- logs_client = AsyncMock()
- logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
- mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
- mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="jr_123",
- verbose=True,
- aws_conn_id="aws_conn_id",
- waiter_delay=0,
- waiter_max_attempts=5,
- )
- with pytest.raises(AirflowException, match="FAILED"):
- async for _ in trigger.run():
- pass
-
- @pytest.mark.asyncio
- @mock.patch.object(AwsLogsHook, "get_async_conn")
- @mock.patch.object(GlueJobHook, "get_async_conn")
- async def test_verbose_run_max_attempts(self, mock_glue_conn,
mock_logs_conn):
- """When verbose=True and the job stays RUNNING past max attempts,
raises AirflowException."""
- glue_client = AsyncMock()
- glue_client.get_job_run = AsyncMock(
- return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}}
- )
- mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
- mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
- logs_client = AsyncMock()
- logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
- mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
- mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
-
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="jr_123",
- verbose=True,
- aws_conn_id="aws_conn_id",
- waiter_delay=0,
- waiter_max_attempts=2,
- )
- with pytest.raises(AirflowException, match="max attempts"):
- async for _ in trigger.run():
- pass
-
- @pytest.mark.asyncio
- async def test_forward_logs_resource_not_found(self):
- """_forward_logs handles ResourceNotFoundException gracefully."""
- logs_client = AsyncMock()
- logs_client.get_log_events = AsyncMock(
- side_effect=ClientError(
- {"Error": {"Code": "ResourceNotFoundException", "Message":
"not found"}},
- "GetLogEvents",
- )
- )
-
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="jr_123",
- verbose=True,
- aws_conn_id="aws_conn_id",
- region_name="us-east-1",
- waiter_delay=0,
- waiter_max_attempts=5,
- )
- result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
- assert result is None
-
- @pytest.mark.asyncio
- async def test_forward_logs_pagination(self, caplog):
- """_forward_logs follows nextForwardToken and formats logs like the
sync path."""
- logs_client = AsyncMock()
- logs_client.get_log_events = AsyncMock(
- side_effect=[
- {
- "events": [{"timestamp": 1, "message": "line 1\n"}],
- "nextForwardToken": "token_2",
- },
- {
- "events": [{"timestamp": 2, "message": "line 2\n"}],
- "nextForwardToken": "token_3",
- },
- {
- "events": [],
- "nextForwardToken": "token_3",
- },
- ]
- )
-
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="jr_123",
- verbose=True,
- aws_conn_id="aws_conn_id",
- waiter_delay=0,
- waiter_max_attempts=5,
- )
- result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
- assert result == "token_3"
- assert logs_client.get_log_events.call_count == 3
- # Verify log format matches sync path: "Glue Job Run <log_group>
Logs:" with tab-indented lines
- assert "Glue Job Run /aws-glue/python-jobs/output Logs:" in caplog.text
- assert "\tline 1" in caplog.text
- assert "\tline 2" in caplog.text
-
- @pytest.mark.asyncio
- async def test_forward_logs_no_new_events(self, caplog):
- """_forward_logs logs 'No new log' when there are no events, matching
sync path."""
- logs_client = AsyncMock()
- logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
-
- trigger = GlueJobCompleteTrigger(
- job_name="job_name",
- run_id="jr_123",
- verbose=True,
- aws_conn_id="aws_conn_id",
- waiter_delay=0,
- waiter_max_attempts=5,
- )
- result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
- assert result == "token_1"
- assert "No new log from the Glue Job in /aws-glue/python-jobs/output"
in caplog.text
-
class TestGlueCatalogPartitionSensorTrigger:
@pytest.mark.asyncio