This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d02b66523f2 Fix SageMaker processing stopped state handling (#67291)
d02b66523f2 is described below

commit d02b66523f2985f63fd9b55443fbcf3571543a5e
Author: Aditya Patel <[email protected]>
AuthorDate: Fri May 22 08:15:15 2026 -0400

    Fix SageMaker processing stopped state handling (#67291)
---
 .../providers/amazon/aws/operators/sagemaker.py    |  2 +-
 .../aws/operators/test_sagemaker_processing.py     | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
index c4fadec6a16..c99228a3e42 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -335,7 +335,7 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
         if self.deferrable and self.wait_for_completion:
             response = 
self.hook.describe_processing_job(self.config["ProcessingJobName"])
             status = response["ProcessingJobStatus"]
-            if status in self.hook.failed_states:
+            if status in self.hook.processing_job_failed_states:
                 raise AirflowException(f"SageMaker job failed because 
{response['FailureReason']}")
             if status == "Completed":
                 self.log.info("%s completed successfully.", self.task_id)
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
index a18058c745e..3bddcf88f3c 100644
--- 
a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
+++ 
b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
@@ -319,6 +319,34 @@ class TestSageMakerProcessingOperator:
 
         assert not mock_defer.called
 
+    
@mock.patch("airflow.providers.amazon.aws.operators.sagemaker.SageMakerProcessingOperator.defer")
+    @mock.patch.object(
+        SageMakerHook,
+        "describe_processing_job",
+        return_value={"ProcessingJobStatus": "Stopped", "FailureReason": "It 
stopped"},
+    )
+    @mock.patch.object(
+        SageMakerHook,
+        "create_processing_job",
+        return_value={"ProcessingJobArn": "test_arn", "ResponseMetadata": 
{"HTTPStatusCode": 200}},
+    )
+    @mock.patch.object(SageMakerBaseOperator, "_check_if_job_exists", 
return_value=False)
+    def test_operator_stopped_before_defer(
+        self,
+        mock_job_exists,
+        mock_processing,
+        mock_describe,
+        mock_defer,
+    ):
+        sagemaker_operator = SageMakerProcessingOperator(
+            **self.defer_processing_config_kwargs,
+            config=CREATE_PROCESSING_PARAMS,
+        )
+        with pytest.raises(AirflowException):
+            sagemaker_operator.execute(context=None)
+
+        assert not mock_defer.called
+
     @mock.patch.object(
         SageMakerHook,
         "describe_processing_job",

Reply via email to