This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 26f85cf9dd2 Revert "Add resume_glue_job_on_retry to GlueJobOperator 
(#59392)" (#62730)
26f85cf9dd2 is described below

commit 26f85cf9dd2e830272d788403f8e350b235fbaf6
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Mar 3 01:19:24 2026 +0100

    Revert "Add resume_glue_job_on_retry to GlueJobOperator (#59392)" (#62730)
    
    This reverts commit 8396957b1f65aeeed13c5ff31d0679437ef2ebb1.
---
 .../airflow/providers/amazon/aws/operators/glue.py |  33 ++-----
 .../tests/unit/amazon/aws/operators/test_glue.py   | 101 ---------------------
 2 files changed, 7 insertions(+), 127 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
index 19ec65a7190..8e9a6deab44 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
@@ -138,7 +138,6 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
         job_poll_interval: int | float = 6,
         waiter_delay: int = 60,
         waiter_max_attempts: int = 75,
-        resume_glue_job_on_retry: bool = False,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -168,7 +167,6 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
         self.s3_script_location: str | None = None
         self.waiter_delay = waiter_delay
         self.waiter_max_attempts = waiter_max_attempts
-        self.resume_glue_job_on_retry = resume_glue_job_on_retry
 
     @property
     def _hook_parameters(self):
@@ -218,30 +216,13 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
 
         :return: the current Glue job ID.
         """
-        previous_job_run_id = None
-        if self.resume_glue_job_on_retry:
-            ti = context["ti"]
-            previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
-            if previous_job_run_id:
-                try:
-                    job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
-                    state = job_run.get("JobRun", {}).get("JobRunState")
-                    self.log.info("Previous Glue job_run_id: %s, state: %s", 
previous_job_run_id, state)
-                    if state in ("RUNNING", "STARTING", "STOPPING"):
-                        self._job_run_id = previous_job_run_id
-                except Exception:
-                    self.log.warning("Failed to get previous Glue job run 
state", exc_info=True)
-
-        if not self._job_run_id:
-            self.log.info(
-                "Initializing AWS Glue Job: %s. Wait for completion: %s",
-                self.job_name,
-                self.wait_for_completion,
-            )
-            glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-            self._job_run_id = glue_job_run["JobRunId"]
-            context["ti"].xcom_push(key="glue_job_run_id", 
value=self._job_run_id)
-
+        self.log.info(
+            "Initializing AWS Glue Job: %s. Wait for completion: %s",
+            self.job_name,
+            self.wait_for_completion,
+        )
+        glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+        self._job_run_id = glue_job_run["JobRunId"]
         glue_job_run_url = GlueJobRunDetailsLink.format_str.format(
             
aws_domain=GlueJobRunDetailsLink.get_aws_domain(self.hook.conn_partition),
             region_name=self.hook.conn_region_name,
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py
index d4e299f26a7..fedf55431a6 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py
@@ -432,107 +432,6 @@ class TestGlueJobOperator:
         )
         assert op.hook.aws_conn_id == DEFAULT_CONN
 
-    @mock.patch.object(GlueJobHook, "get_conn")
-    @mock.patch.object(GlueJobHook, "initialize_job")
-    def test_check_previous_job_id_run_reuse_in_progress(self, 
mock_initialize_job, mock_get_conn):
-        """Test that when resume_glue_job_on_retry=True and previous job is in 
progress, it is reused."""
-        glue = GlueJobOperator(
-            task_id=TASK_ID,
-            job_name=JOB_NAME,
-            script_location="s3://folder/file",
-            aws_conn_id="aws_default",
-            region_name="us-west-2",
-            s3_bucket="some_bucket",
-            iam_role_name="my_test_role",
-            resume_glue_job_on_retry=True,
-            wait_for_completion=False,
-        )
-
-        # Mock the context and task instance
-        mock_ti = mock.MagicMock()
-        mock_context = {"ti": mock_ti}
-
-        # Simulate previous job_run_id in XCom
-        previous_job_run_id = "previous_run_12345"
-        mock_ti.xcom_pull.return_value = previous_job_run_id
-
-        # Mock the Glue client to return RUNNING state for the previous job
-        mock_glue_client = mock.MagicMock()
-        glue.hook.conn = mock_glue_client
-        mock_glue_client.get_job_run.return_value = {
-            "JobRun": {
-                "JobRunState": "RUNNING",
-            }
-        }
-
-        # Execute the operator
-        glue.execute(mock_context)
-
-        # Verify that the previous job_run_id was reused
-        assert glue._job_run_id == previous_job_run_id
-        # Verify that initialize_job was NOT called
-        mock_initialize_job.assert_not_called()
-        # Verify that XCom push was not called for glue_job_run_id (since we 
reused the previous one)
-        # Note: xcom_push may be called for other purposes like 
glue_job_run_details
-        xcom_calls = [
-            call for call in mock_ti.xcom_push.call_args_list if 
call[1].get("key") == "glue_job_run_id"
-        ]
-        assert len(xcom_calls) == 0, "Should not push new glue_job_run_id when 
reusing previous one"
-
-    @mock.patch.object(GlueJobHook, "get_conn")
-    @mock.patch.object(GlueJobHook, "initialize_job")
-    def test_check_previous_job_id_run_new_on_finished(self, 
mock_initialize_job, mock_get_conn):
-        """Test that when previous job is finished, a new job is started and 
pushed to XCom."""
-        glue = GlueJobOperator(
-            task_id=TASK_ID,
-            job_name=JOB_NAME,
-            script_location="s3://folder/file",
-            aws_conn_id="aws_default",
-            region_name="us-west-2",
-            s3_bucket="some_bucket",
-            iam_role_name="my_test_role",
-            resume_glue_job_on_retry=True,
-            wait_for_completion=False,
-        )
-
-        # Mock the context and task instance
-        mock_ti = mock.MagicMock()
-        mock_context = {"ti": mock_ti}
-
-        # Simulate previous job_run_id in XCom
-        previous_job_run_id = "previous_run_12345"
-        mock_ti.xcom_pull.return_value = previous_job_run_id
-
-        # Mock the Glue client to return SUCCEEDED state for the previous job
-        mock_glue_client = mock.MagicMock()
-        glue.hook.conn = mock_glue_client
-        mock_glue_client.get_job_run.return_value = {
-            "JobRun": {
-                "JobRunState": "SUCCEEDED",
-            }
-        }
-
-        # Mock initialize_job to return a new job run ID
-        new_job_run_id = "new_run_67890"
-        mock_initialize_job.return_value = {
-            "JobRunState": "RUNNING",
-            "JobRunId": new_job_run_id,
-        }
-
-        # Execute the operator
-        glue.execute(mock_context)
-
-        # Verify that a new job_run_id was created
-        assert glue._job_run_id == new_job_run_id
-        # Verify that initialize_job was called
-        mock_initialize_job.assert_called_once()
-        # Verify that the new job_run_id was pushed to XCom
-        xcom_calls = [
-            call for call in mock_ti.xcom_push.call_args_list if 
call[1].get("key") == "glue_job_run_id"
-        ]
-        assert len(xcom_calls) == 1, "Should push new glue_job_run_id"
-        assert xcom_calls[0][1]["value"] == new_job_run_id
-
 
 class TestGlueDataQualityOperator:
     RULE_SET_NAME = "TestRuleSet"

Reply via email to