Lee-W commented on code in PR #68711:
URL: https://github.com/apache/airflow/pull/68711#discussion_r3437166746


##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -495,7 +498,37 @@ def update_job(self, **job_kwargs) -> bool:
             self.conn.update_job(JobName=job_name, JobUpdate=job_kwargs)
             self.log.info("Updated configurations: %s", update_config)
             return True
-        return False
+        return tags_updated
+
+    def update_tags(self, job_name: str, job_tags: dict) -> bool:
+        """
+        Reconcile a job's tags with the desired set.
+
+        Glue manages tags outside of ``update_job``, so adds/updates go through
+        ``tag_resource`` and removals through ``untag_resource``.
+
+        .. seealso::
+            - :external+boto3:py:meth:`Glue.Client.tag_resource`
+            - :external+boto3:py:meth:`Glue.Client.untag_resource`
+
+        :param job_name: Name of the job for which to update tags
+        :param job_tags: Desired tags. Keys absent from this mapping are 
removed from the job.
+        :return: True if any tag was added, changed, or removed, False 
otherwise
+        """
+        account_number = 
StsHook(aws_conn_id=self.aws_conn_id).get_account_number()
+        job_arn = 
f"arn:{self.conn_partition}:glue:{self.conn_region_name}:{account_number}:job/{job_name}"
+        current_tags: dict = self.conn.get_tags(ResourceArn=job_arn)["Tags"]
+
+        tags_to_add = {key: value for key, value in job_tags.items() if 
current_tags.get(key) != value}
+        tags_to_remove = [key for key in current_tags if key not in job_tags]
+
+        if tags_to_add:
+            self.log.info("Updating job tags: %s", job_name)
+            self.conn.tag_resource(ResourceArn=job_arn, TagsToAdd=tags_to_add)
+        if tags_to_remove:
+            self.log.info("Removing job tags: %s", job_name)
+            self.conn.untag_resource(ResourceArn=job_arn, 
TagsToRemove=tags_to_remove)

Review Comment:
   ```suggestion
           if (tags_to_add := {key: value for key, value in job_tags.items() if 
current_tags.get(key) != value})
               self.log.info("Updating job tags: %s", job_name)
               self.conn.tag_resource(ResourceArn=job_arn, 
TagsToAdd=tags_to_add)
           if (tags_to_remove := [key for key in current_tags if key not in 
job_tags])
               self.log.info("Removing job tags: %s", job_name)
               self.conn.untag_resource(ResourceArn=job_arn, 
TagsToRemove=tags_to_remove)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to