This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fc2f02fa8b Fix GlueJobHook failing to update a Glue job that has tags 
(#68711)
3fc2f02fa8b is described below

commit 3fc2f02fa8baab99296f13f275bb2200504904db
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Jun 19 22:57:47 2026 +0900

    Fix GlueJobHook failing to update a Glue job that has tags (#68711)
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../src/airflow/providers/amazon/aws/hooks/glue.py | 32 +++++++++-
 .../tests/unit/amazon/aws/hooks/test_glue.py       | 69 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 1 deletion(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
index 0e58d605689..6656c3fedb3 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py
@@ -36,6 +36,7 @@ from tenacity import (
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
 from airflow.providers.common.compat.sdk import AirflowException
 
 DEFAULT_LOG_SUFFIX = "output"
@@ -485,6 +486,8 @@ class GlueJobHook(AwsBaseHook):
         :return: True if job was updated and false otherwise
         """
         job_name = job_kwargs.pop("Name")
+        # Glue ``update_job`` does not accept ``Tags`` in ``JobUpdate``; 
reconcile them separately.
+        tags_updated = self.update_tags(job_name, job_kwargs.pop("Tags")) if 
"Tags" in job_kwargs else False
         current_job = self.conn.get_job(JobName=job_name)["Job"]
 
         update_config = {
@@ -495,7 +498,34 @@ class GlueJobHook(AwsBaseHook):
             self.conn.update_job(JobName=job_name, JobUpdate=job_kwargs)
             self.log.info("Updated configurations: %s", update_config)
             return True
-        return False
+        return tags_updated
+
+    def update_tags(self, job_name: str, job_tags: dict) -> bool:
+        """
+        Reconcile a job's tags with the desired set.
+
+        Glue manages tags outside of ``update_job``, so adds/updates go through
+        ``tag_resource`` and removals through ``untag_resource``.
+
+        .. seealso::
+            - :external+boto3:py:meth:`Glue.Client.tag_resource`
+            - :external+boto3:py:meth:`Glue.Client.untag_resource`
+
+        :param job_name: Name of the job for which to update tags
+        :param job_tags: Desired tags. Keys absent from this mapping are 
removed from the job.
+        :return: True if any tag was added, changed, or removed, False 
otherwise
+        """
+        account_number = 
StsHook(aws_conn_id=self.aws_conn_id).get_account_number()
+        job_arn = 
f"arn:{self.conn_partition}:glue:{self.conn_region_name}:{account_number}:job/{job_name}"
+        current_tags: dict = self.conn.get_tags(ResourceArn=job_arn)["Tags"]
+
+        if tags_to_add := {key: value for key, value in job_tags.items() if 
current_tags.get(key) != value}:
+            self.log.info("Updating job tags: %s", job_name)
+            self.conn.tag_resource(ResourceArn=job_arn, TagsToAdd=tags_to_add)
+        if tags_to_remove := [key for key in current_tags if key not in 
job_tags]:
+            self.log.info("Removing job tags: %s", job_name)
+            self.conn.untag_resource(ResourceArn=job_arn, 
TagsToRemove=tags_to_remove)
+        return bool(tags_to_add or tags_to_remove)
 
     def get_or_create_glue_job(self) -> str | None:
         """
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py 
b/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
index 3e78b296042..1b3dda0c881 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_glue.py
@@ -26,6 +26,7 @@ import boto3
 import pytest
 from botocore.exceptions import ClientError
 from moto import mock_aws
+from moto.core import DEFAULT_ACCOUNT_ID
 
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook, 
GlueJobHook
@@ -310,6 +311,74 @@ class TestGlueJobHook:
         )
         assert result == job_name
 
+    @mock_aws
+    @pytest.mark.parametrize(
+        ("current_tags", "desired_tags", "expected_tags", "expected_changed"),
+        [
+            pytest.param(
+                {"env": "dev"},
+                {"env": "dev", "team": "data"},
+                {"env": "dev", "team": "data"},
+                True,
+                id="add-new-tag",
+            ),
+            pytest.param({"env": "dev"}, {"env": "prod"}, {"env": "prod"}, 
True, id="replace-value"),
+            pytest.param(
+                {"env": "dev", "team": "data"}, {"env": "dev"}, {"env": 
"dev"}, True, id="remove-tag"
+            ),
+            pytest.param({"env": "dev"}, {}, {}, True, id="remove-all-tags"),
+            pytest.param({"env": "dev"}, {"env": "dev"}, {"env": "dev"}, 
False, id="no-change"),
+        ],
+    )
+    def test_update_tags(self, current_tags, desired_tags, expected_tags, 
expected_changed):
+        """Tags are reconciled through the tag API because Glue ``update_job`` 
cannot modify them."""
+        job_name = "aws_test_glue_job_with_tags"
+        boto3.client("glue", region_name=self.some_aws_region).create_job(
+            Name=job_name,
+            Role="test-role",
+            Command={"Name": "glueetl", "ScriptLocation": 
"s3://glue-examples/script.py"},
+            Tags=current_tags,
+        )
+        hook = GlueJobHook(job_name=job_name, region_name=self.some_aws_region)
+
+        assert hook.update_tags(job_name, desired_tags) is expected_changed
+
+        job_arn = 
f"arn:aws:glue:{self.some_aws_region}:{DEFAULT_ACCOUNT_ID}:job/{job_name}"
+        assert hook.conn.get_tags(ResourceArn=job_arn)["Tags"] == expected_tags
+
+    @mock.patch.object(GlueJobHook, "update_tags")
+    @mock.patch.object(AwsBaseHook, "conn")
+    def test_update_job_keeps_tags_out_of_job_update(self, mock_conn, 
mock_update_tags):
+        """``Tags`` must be stripped from ``JobUpdate`` and reconciled 
separately."""
+        job_name = "aws_test_glue_job"
+        mock_conn.get_job.return_value = {"Job": {"Name": job_name, 
"Description": "old"}}
+        mock_update_tags.return_value = False
+        hook = GlueJobHook(job_name=job_name, region_name=self.some_aws_region)
+
+        updated = hook.update_job(Name=job_name, Description="new", 
Tags={"env": "prod"})
+
+        mock_update_tags.assert_called_once_with(job_name, {"env": "prod"})
+        mock_conn.update_job.assert_called_once_with(JobName=job_name, 
JobUpdate={"Description": "new"})
+        assert updated is True
+
+    @pytest.mark.parametrize("tags_changed", [True, False])
+    @mock.patch.object(GlueJobHook, "update_tags")
+    @mock.patch.object(AwsBaseHook, "conn")
+    def test_update_job_returns_tag_result_when_config_unchanged(
+        self, mock_conn, mock_update_tags, tags_changed
+    ):
+        """With no config diff, the result reflects whether only the tags 
changed."""
+        job_name = "aws_test_glue_job"
+        mock_conn.get_job.return_value = {"Job": {"Name": job_name, 
"Description": "same"}}
+        mock_update_tags.return_value = tags_changed
+        hook = GlueJobHook(job_name=job_name, region_name=self.some_aws_region)
+
+        updated = hook.update_job(Name=job_name, Description="same", 
Tags={"env": "prod"})
+
+        mock_update_tags.assert_called_once_with(job_name, {"env": "prod"})
+        mock_conn.update_job.assert_not_called()
+        assert updated is tags_changed
+
     @mock_aws
     @mock.patch.object(GlueJobHook, "get_iam_execution_role")
     def test_create_or_update_glue_job_worker_type(self, 
mock_get_iam_execution_role):

Reply via email to