This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c8fc0ade09 Remove deprecated code is AWS provider (#41407)
c8fc0ade09 is described below

commit c8fc0ade0901ff7f08c45545f56caeb1a7d96bd1
Author: Vincent <[email protected]>
AuthorDate: Thu Aug 15 11:20:56 2024 -0400

    Remove deprecated code is AWS provider (#41407)
---
 airflow/providers/amazon/aws/sensors/athena.py     |  8 +---
 airflow/providers/amazon/aws/sensors/batch.py      | 40 +++-------------
 airflow/providers/amazon/aws/sensors/bedrock.py    |  5 +-
 .../amazon/aws/sensors/cloud_formation.py          | 13 +-----
 airflow/providers/amazon/aws/sensors/comprehend.py |  8 +---
 airflow/providers/amazon/aws/sensors/dms.py        | 16 ++-----
 airflow/providers/amazon/aws/sensors/ec2.py        |  8 +---
 airflow/providers/amazon/aws/sensors/ecs.py        | 10 ++--
 airflow/providers/amazon/aws/sensors/eks.py        | 12 ++---
 airflow/providers/amazon/aws/sensors/emr.py        | 45 ++++--------------
 airflow/providers/amazon/aws/sensors/glacier.py    |  8 +---
 airflow/providers/amazon/aws/sensors/glue.py       |  9 ----
 .../amazon/aws/sensors/glue_catalog_partition.py   |  8 +---
 .../providers/amazon/aws/sensors/glue_crawler.py   |  8 +---
 .../amazon/aws/sensors/kinesis_analytics.py        |  5 +-
 .../amazon/aws/sensors/lambda_function.py          | 10 ++--
 .../amazon/aws/sensors/opensearch_serverless.py    |  5 +-
 airflow/providers/amazon/aws/sensors/quicksight.py |  7 +--
 .../amazon/aws/sensors/redshift_cluster.py         |  8 +---
 airflow/providers/amazon/aws/sensors/s3.py         | 17 ++-----
 airflow/providers/amazon/aws/sensors/sagemaker.py  | 10 ++--
 airflow/providers/amazon/aws/sensors/sqs.py        | 14 ++----
 .../providers/amazon/aws/sensors/step_function.py  |  8 +---
 tests/providers/amazon/aws/sensors/test_athena.py  | 15 ++----
 tests/providers/amazon/aws/sensors/test_batch.py   | 46 ++++--------------
 tests/providers/amazon/aws/sensors/test_bedrock.py | 54 ++++++----------------
 .../amazon/aws/sensors/test_cloud_formation.py     | 27 +++--------
 .../amazon/aws/sensors/test_comprehend.py          | 28 +++--------
 tests/providers/amazon/aws/sensors/test_dms.py     | 24 ++--------
 tests/providers/amazon/aws/sensors/test_ecs.py     | 20 +-------
 .../aws/sensors/test_emr_serverless_application.py | 15 +-----
 .../amazon/aws/sensors/test_emr_serverless_job.py  | 13 +-----
 tests/providers/amazon/aws/sensors/test_glacier.py | 14 ++----
 tests/providers/amazon/aws/sensors/test_glue.py    | 10 ++--
 .../aws/sensors/test_glue_catalog_partition.py     | 10 ++--
 .../amazon/aws/sensors/test_glue_crawler.py        | 10 ++--
 .../amazon/aws/sensors/test_glue_data_quality.py   | 28 +++--------
 .../amazon/aws/sensors/test_kinesis_analytics.py   | 28 +++--------
 .../amazon/aws/sensors/test_lambda_function.py     | 10 ++--
 .../aws/sensors/test_opensearch_serverless.py      | 17 ++-----
 .../amazon/aws/sensors/test_quicksight.py          | 17 ++-----
 tests/providers/amazon/aws/sensors/test_s3.py      | 26 +++--------
 .../amazon/aws/sensors/test_sagemaker_base.py      | 10 ++--
 tests/providers/amazon/aws/sensors/test_sqs.py     | 28 +++--------
 .../amazon/aws/sensors/test_step_function.py       | 17 ++-----
 45 files changed, 170 insertions(+), 579 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/athena.py 
b/airflow/providers/amazon/aws/sensors/athena.py
index 38f2bb54f8..1dc8a4fe33 100644
--- a/airflow/providers/amazon/aws/sensors/athena.py
+++ b/airflow/providers/amazon/aws/sensors/athena.py
@@ -25,7 +25,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.athena import AthenaHook
 
 
@@ -88,11 +88,7 @@ class AthenaSensor(AwsBaseSensor[AthenaHook]):
         state = self.hook.poll_query_status(self.query_execution_id, 
self.max_retries, self.sleep_time)
 
         if state in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = "Athena sensor failed"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException("Athena sensor failed")
 
         if state in self.INTERMEDIATE_STATES:
             return False
diff --git a/airflow/providers/amazon/aws/sensors/batch.py 
b/airflow/providers/amazon/aws/sensors/batch.py
index c5dcb0e46d..9c1a29f809 100644
--- a/airflow/providers/amazon/aws/sensors/batch.py
+++ b/airflow/providers/amazon/aws/sensors/batch.py
@@ -86,18 +86,7 @@ class BatchSensor(BaseSensorOperator):
         if state in BatchClientHook.INTERMEDIATE_STATES:
             return False
 
-        if state == BatchClientHook.FAILURE_STATE:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Batch sensor failed. AWS Batch job status: {state}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
-
-        # TODO: remove this if block when min_airflow_version is set to higher 
than 2.7.1
-        message = f"Batch sensor failed. Unknown AWS Batch job status: {state}"
-        if self.soft_fail:
-            raise AirflowSkipException(message)
-        raise AirflowException(message)
+        raise AirflowException(f"Batch sensor failed. AWS Batch job status: 
{state}")
 
     def execute(self, context: Context) -> None:
         if not self.deferrable:
@@ -127,12 +116,7 @@ class BatchSensor(BaseSensorOperator):
         Relies on trigger to throw an exception, otherwise it assumes 
execution was successful.
         """
         if event["status"] != "success":
-            message = f"Error while running job: {event}"
-            # TODO: remove this if-else block when min_airflow_version is set 
to higher than the version that
-            # changed in https://github.com/apache/airflow/pull/33424 is 
released (2.7.1)
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Error while running job: {event}")
         job_id = event["job_id"]
         self.log.info("Batch Job %s complete", job_id)
 
@@ -198,11 +182,7 @@ class BatchComputeEnvironmentSensor(BaseSensorOperator):
         )
 
         if not response["computeEnvironments"]:
-            message = f"AWS Batch compute environment 
{self.compute_environment} not found"
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"AWS Batch compute environment 
{self.compute_environment} not found")
 
         status = response["computeEnvironments"][0]["status"]
 
@@ -212,11 +192,9 @@ class BatchComputeEnvironmentSensor(BaseSensorOperator):
         if status in BatchClientHook.COMPUTE_ENVIRONMENT_INTERMEDIATE_STATUS:
             return False
 
-        # TODO: remove this if block when min_airflow_version is set to higher 
than 2.7.1
-        message = f"AWS Batch compute environment failed. AWS Batch compute 
environment status: {status}"
-        if self.soft_fail:
-            raise AirflowSkipException(message)
-        raise AirflowException(message)
+        raise AirflowException(
+            f"AWS Batch compute environment failed. AWS Batch compute 
environment status: {status}"
+        )
 
 
 class BatchJobQueueSensor(BaseSensorOperator):
@@ -276,11 +254,7 @@ class BatchJobQueueSensor(BaseSensorOperator):
             if self.treat_non_existing_as_deleted:
                 return True
             else:
-                # TODO: remove this if block when min_airflow_version is set 
to higher than 2.7.1
-                message = f"AWS Batch job queue {self.job_queue} not found"
-                if self.soft_fail:
-                    raise AirflowSkipException(message)
-                raise AirflowException(message)
+                raise AirflowException(f"AWS Batch job queue {self.job_queue} 
not found")
 
         status = response["jobQueues"][0]["status"]
 
diff --git a/airflow/providers/amazon/aws/sensors/bedrock.py 
b/airflow/providers/amazon/aws/sensors/bedrock.py
index 8532886554..e9157ab9c1 100644
--- a/airflow/providers/amazon/aws/sensors/bedrock.py
+++ b/airflow/providers/amazon/aws/sensors/bedrock.py
@@ -21,7 +21,7 @@ import abc
 from typing import TYPE_CHECKING, Any, Sequence, TypeVar
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook, 
BedrockHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.bedrock import (
@@ -76,9 +76,6 @@ class BedrockBaseSensor(AwsBaseSensor[_GenericBedrockHook]):
     def poke(self, context: Context, **kwargs) -> bool:
         state = self.get_state()
         if state in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(self.FAILURE_MESSAGE)
             raise AirflowException(self.FAILURE_MESSAGE)
 
         return state not in self.INTERMEDIATE_STATES
diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py 
b/airflow/providers/amazon/aws/sensors/cloud_formation.py
index f67278ecec..ba07433bf0 100644
--- a/airflow/providers/amazon/aws/sensors/cloud_formation.py
+++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py
@@ -27,7 +27,6 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-from airflow.exceptions import AirflowSkipException
 from airflow.providers.amazon.aws.hooks.cloud_formation import 
CloudFormationHook
 
 
@@ -67,11 +66,7 @@ class 
CloudFormationCreateStackSensor(AwsBaseSensor[CloudFormationHook]):
         if stack_status in ("CREATE_IN_PROGRESS", None):
             return False
 
-        # TODO: remove this if check when min_airflow_version is set to higher 
than 2.7.1
-        message = f"Stack {self.stack_name} in bad state: {stack_status}"
-        if self.soft_fail:
-            raise AirflowSkipException(message)
-        raise ValueError(message)
+        raise ValueError(f"Stack {self.stack_name} in bad state: 
{stack_status}")
 
 
 class CloudFormationDeleteStackSensor(AwsBaseSensor[CloudFormationHook]):
@@ -119,8 +114,4 @@ class 
CloudFormationDeleteStackSensor(AwsBaseSensor[CloudFormationHook]):
         if stack_status == "DELETE_IN_PROGRESS":
             return False
 
-        # TODO: remove this if check when min_airflow_version is set to higher 
than 2.7.1
-        message = f"Stack {self.stack_name} in bad state: {stack_status}"
-        if self.soft_fail:
-            raise AirflowSkipException(message)
-        raise ValueError(message)
+        raise ValueError(f"Stack {self.stack_name} in bad state: 
{stack_status}")
diff --git a/airflow/providers/amazon/aws/sensors/comprehend.py 
b/airflow/providers/amazon/aws/sensors/comprehend.py
index 42344f65e9..545f7b02fc 100644
--- a/airflow/providers/amazon/aws/sensors/comprehend.py
+++ b/airflow/providers/amazon/aws/sensors/comprehend.py
@@ -20,7 +20,7 @@ import abc
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.comprehend import (
@@ -71,9 +71,6 @@ class ComprehendBaseSensor(AwsBaseSensor[ComprehendHook]):
     def poke(self, context: Context, **kwargs) -> bool:
         state = self.get_state()
         if state in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(self.FAILURE_MESSAGE)
             raise AirflowException(self.FAILURE_MESSAGE)
 
         return state not in self.INTERMEDIATE_STATES
@@ -241,9 +238,6 @@ class 
ComprehendCreateDocumentClassifierCompletedSensor(AwsBaseSensor[Comprehend
         )
 
         if status in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(self.FAILURE_MESSAGE)
             raise AirflowException(self.FAILURE_MESSAGE)
 
         if status in self.SUCCESS_STATES:
diff --git a/airflow/providers/amazon/aws/sensors/dms.py 
b/airflow/providers/amazon/aws/sensors/dms.py
index 864a3b5276..2ea52ea0b5 100644
--- a/airflow/providers/amazon/aws/sensors/dms.py
+++ b/airflow/providers/amazon/aws/sensors/dms.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Iterable, Sequence
 
 from deprecated import deprecated
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -75,11 +75,9 @@ class DmsTaskBaseSensor(AwsBaseSensor[DmsHook]):
 
     def poke(self, context: Context):
         if not (status := 
self.hook.get_task_status(self.replication_task_arn)):
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Failed to read task status, task with ARN 
{self.replication_task_arn} not found"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(
+                f"Failed to read task status, task with ARN 
{self.replication_task_arn} not found"
+            )
 
         self.log.info("DMS Replication task (%s) has status: %s", 
self.replication_task_arn, status)
 
@@ -87,11 +85,7 @@ class DmsTaskBaseSensor(AwsBaseSensor[DmsHook]):
             return True
 
         if status in self.termination_statuses:
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Unexpected status: {status}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Unexpected status: {status}")
 
         return False
 
diff --git a/airflow/providers/amazon/aws/sensors/ec2.py 
b/airflow/providers/amazon/aws/sensors/ec2.py
index 778bc49caa..0736c63393 100644
--- a/airflow/providers/amazon/aws/sensors/ec2.py
+++ b/airflow/providers/amazon/aws/sensors/ec2.py
@@ -21,7 +21,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
 from airflow.providers.amazon.aws.triggers.ec2 import EC2StateSensorTrigger
 from airflow.providers.amazon.aws.utils import validate_execute_complete_event
@@ -97,8 +97,4 @@ class EC2InstanceStateSensor(BaseSensorOperator):
         event = validate_execute_complete_event(event)
 
         if event["status"] != "success":
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Error: {event}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Error: {event}")
diff --git a/airflow/providers/amazon/aws/sensors/ecs.py 
b/airflow/providers/amazon/aws/sensors/ecs.py
index 02a212fbde..aba3e55922 100644
--- a/airflow/providers/amazon/aws/sensors/ecs.py
+++ b/airflow/providers/amazon/aws/sensors/ecs.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 from functools import cached_property
 from typing import TYPE_CHECKING, Sequence
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.ecs import (
     EcsClusterStates,
     EcsHook,
@@ -37,11 +37,9 @@ if TYPE_CHECKING:
 
 def _check_failed(current_state, target_state, failure_states, soft_fail: 
bool) -> None:
     if (current_state != target_state) and (current_state in failure_states):
-        # TODO: remove this if block when min_airflow_version is set to higher 
than 2.7.1
-        message = f"Terminal state reached. Current state: {current_state}, 
Expected state: {target_state}"
-        if soft_fail:
-            raise AirflowSkipException(message)
-        raise AirflowException(message)
+        raise AirflowException(
+            f"Terminal state reached. Current state: {current_state}, Expected 
state: {target_state}"
+        )
 
 
 class EcsBaseSensor(AwsBaseSensor[EcsHook]):
diff --git a/airflow/providers/amazon/aws/sensors/eks.py 
b/airflow/providers/amazon/aws/sensors/eks.py
index a5dcdeb0ef..79e160b007 100644
--- a/airflow/providers/amazon/aws/sensors/eks.py
+++ b/airflow/providers/amazon/aws/sensors/eks.py
@@ -22,7 +22,7 @@ from abc import abstractmethod
 from functools import cached_property
 from typing import TYPE_CHECKING, Sequence
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.eks import (
     ClusterStates,
     EksHook,
@@ -106,12 +106,10 @@ class EksBaseSensor(BaseSensorOperator):
         state = self.get_state()
         self.log.info("Current state: %s", state)
         if state in (self.get_terminal_states() - {self.target_state}):
-            # If we reach a terminal state which is not the target state:
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Terminal state reached. Current state: {state}, 
Expected state: {self.target_state}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            # If we reach a terminal state which is not the target state
+            raise AirflowException(
+                f"Terminal state reached. Current state: {state}, Expected 
state: {self.target_state}"
+            )
         return state == self.target_state
 
     @abstractmethod
diff --git a/airflow/providers/amazon/aws/sensors/emr.py 
b/airflow/providers/amazon/aws/sensors/emr.py
index 19e026e7a6..e79642d35c 100644
--- a/airflow/providers/amazon/aws/sensors/emr.py
+++ b/airflow/providers/amazon/aws/sensors/emr.py
@@ -27,7 +27,6 @@ from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowException,
     AirflowProviderDeprecationWarning,
-    AirflowSkipException,
 )
 from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, 
EmrServerlessHook
 from airflow.providers.amazon.aws.links.emr import EmrClusterLink, 
EmrLogsLink, get_log_uri
@@ -91,11 +90,7 @@ class EmrBaseSensor(BaseSensorOperator):
             return True
 
         if state in self.failed_states:
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"EMR job failed: 
{self.failure_message_from_response(response)}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"EMR job failed: 
{self.failure_message_from_response(response)}")
 
         return False
 
@@ -172,11 +167,9 @@ class EmrServerlessJobSensor(BaseSensorOperator):
         state = response["jobRun"]["state"]
 
         if state in EmrServerlessHook.JOB_FAILURE_STATES:
-            failure_message = f"EMR Serverless job failed: 
{self.failure_message_from_response(response)}"
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(failure_message)
-            raise AirflowException(failure_message)
+            raise AirflowException(
+                f"EMR Serverless job failed: 
{self.failure_message_from_response(response)}"
+            )
 
         return state in self.target_states
 
@@ -234,13 +227,9 @@ class EmrServerlessApplicationSensor(BaseSensorOperator):
         state = response["application"]["state"]
 
         if state in EmrServerlessHook.APPLICATION_FAILURE_STATES:
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            failure_message = (
+            raise AirflowException(
                 f"EMR Serverless application failed: 
{self.failure_message_from_response(response)}"
             )
-            if self.soft_fail:
-                raise AirflowSkipException(failure_message)
-            raise AirflowException(failure_message)
 
         return state in self.target_states
 
@@ -328,11 +317,7 @@ class EmrContainerSensor(BaseSensorOperator):
         )
 
         if state in self.FAILURE_STATES:
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = "EMR Containers sensor failed"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException("EMR Containers sensor failed")
 
         if state in self.INTERMEDIATE_STATES:
             return False
@@ -370,11 +355,7 @@ class EmrContainerSensor(BaseSensorOperator):
         event = validate_execute_complete_event(event)
 
         if event["status"] != "success":
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Error while running job: {event}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Error while running job: {event}")
 
         self.log.info("Job completed.")
 
@@ -563,11 +544,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
         event = validate_execute_complete_event(event)
 
         if event["status"] != "success":
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Error while running job: {event}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Error while running job: {event}")
         self.log.info("Job completed.")
 
 
@@ -696,10 +673,6 @@ class EmrStepSensor(EmrBaseSensor):
         event = validate_execute_complete_event(event)
 
         if event["status"] != "success":
-            # TODO: remove this if check when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Error while running job: {event}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Error while running job: {event}")
 
         self.log.info("Job %s completed.", self.job_flow_id)
diff --git a/airflow/providers/amazon/aws/sensors/glacier.py 
b/airflow/providers/amazon/aws/sensors/glacier.py
index 7a65fc6fc3..eb87d8a51a 100644
--- a/airflow/providers/amazon/aws/sensors/glacier.py
+++ b/airflow/providers/amazon/aws/sensors/glacier.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from enum import Enum
 from typing import TYPE_CHECKING, Any, Sequence
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -93,10 +93,6 @@ class GlacierJobOperationSensor(AwsBaseSensor[GlacierHook]):
             self.log.warning("Code status: %s", response["StatusCode"])
             return False
         else:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = (
+            raise AirflowException(
                 f'Sensor failed. Job status: {response["Action"]}, code 
status: {response["StatusCode"]}'
             )
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
diff --git a/airflow/providers/amazon/aws/sensors/glue.py 
b/airflow/providers/amazon/aws/sensors/glue.py
index 8493c2fd4a..062e4ab3ef 100644
--- a/airflow/providers/amazon/aws/sensors/glue.py
+++ b/airflow/providers/amazon/aws/sensors/glue.py
@@ -86,9 +86,6 @@ class GlueJobSensor(BaseSensorOperator):
             elif job_state in self.errored_states:
                 job_error_message = "Exiting Job %s Run State: %s", 
self.run_id, job_state
                 self.log.info(job_error_message)
-                # TODO: remove this if block when min_airflow_version is set 
to higher than 2.7.1
-                if self.soft_fail:
-                    raise AirflowSkipException(job_error_message)
                 raise AirflowException(job_error_message)
             else:
                 return False
@@ -223,9 +220,6 @@ class 
GlueDataQualityRuleSetEvaluationRunSensor(AwsBaseSensor[GlueDataQualityHoo
                 f": {response.get('ErrorString')}"
             )
             self.log.info(job_error_message)
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(job_error_message)
             raise AirflowException(job_error_message)
         else:
             return False
@@ -343,9 +337,6 @@ class 
GlueDataQualityRuleRecommendationRunSensor(AwsBaseSensor[GlueDataQualityHo
                 f": {response.get('ErrorString')}"
             )
             self.log.info(job_error_message)
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(job_error_message)
             raise AirflowException(job_error_message)
         else:
             return False
diff --git a/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py 
b/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
index f397a446be..af125e2dda 100644
--- a/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
+++ b/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
@@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any, Sequence
 from deprecated import deprecated
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.glue import 
GlueCatalogPartitionTrigger
@@ -127,11 +127,7 @@ class 
GlueCatalogPartitionSensor(AwsBaseSensor[GlueCatalogHook]):
         event = validate_execute_complete_event(event)
 
         if event["status"] != "success":
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Trigger error: event is {event}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Trigger error: event is {event}")
         self.log.info("Partition exists in the Glue Catalog")
 
     @deprecated(reason="use `hook` property instead.", 
category=AirflowProviderDeprecationWarning)
diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py 
b/airflow/providers/amazon/aws/sensors/glue_crawler.py
index ce35aef2cf..2d4396c010 100644
--- a/airflow/providers/amazon/aws/sensors/glue_crawler.py
+++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Sequence
 
 from deprecated import deprecated
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -75,11 +75,7 @@ class GlueCrawlerSensor(AwsBaseSensor[GlueCrawlerHook]):
                 self.log.info("Status: %s", crawler_status)
                 return True
             else:
-                # TODO: remove this if block when min_airflow_version is set 
to higher than 2.7.1
-                message = f"Status: {crawler_status}"
-                if self.soft_fail:
-                    raise AirflowSkipException(message)
-                raise AirflowException(message)
+                raise AirflowException(f"Status: {crawler_status}")
         else:
             return False
 
diff --git a/airflow/providers/amazon/aws/sensors/kinesis_analytics.py 
b/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
index 2c02e050c7..673445e67d 100644
--- a/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
+++ b/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
@@ -80,9 +80,6 @@ class 
KinesisAnalyticsV2BaseSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
         )
 
         if status in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(self.FAILURE_MESSAGE)
             raise AirflowException(self.FAILURE_MESSAGE)
 
         if status in self.SUCCESS_STATES:
diff --git a/airflow/providers/amazon/aws/sensors/lambda_function.py 
b/airflow/providers/amazon/aws/sensors/lambda_function.py
index c54dfbe8b7..8e01d40235 100644
--- a/airflow/providers/amazon/aws/sensors/lambda_function.py
+++ b/airflow/providers/amazon/aws/sensors/lambda_function.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any, Sequence
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils import trim_none_values
@@ -78,10 +78,8 @@ class LambdaFunctionStateSensor(AwsBaseSensor[LambdaHook]):
         state = 
self.hook.conn.get_function(**trim_none_values(get_function_args))["Configuration"]["State"]
 
         if state in self.FAILURE_STATES:
-            message = "Lambda function state sensor failed because the Lambda 
is in a failed state"
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(
+                "Lambda function state sensor failed because the Lambda is in 
a failed state"
+            )
 
         return state in self.target_states
diff --git a/airflow/providers/amazon/aws/sensors/opensearch_serverless.py 
b/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
index 7f5f650e0e..0f539f9bfb 100644
--- a/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
+++ b/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.opensearch_serverless import 
OpenSearchServerlessHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.opensearch_serverless import (
@@ -104,9 +104,6 @@ class 
OpenSearchServerlessCollectionActiveSensor(AwsBaseSensor[OpenSearchServerl
         state = 
self.hook.conn.batch_get_collection(**call_args)["collectionDetails"][0]["status"]
 
         if state in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(self.FAILURE_MESSAGE)
             raise AirflowException(self.FAILURE_MESSAGE)
 
         if state in self.INTERMEDIATE_STATES:
diff --git a/airflow/providers/amazon/aws/sensors/quicksight.py 
b/airflow/providers/amazon/aws/sensors/quicksight.py
index 321fa56dd2..848c0dc704 100644
--- a/airflow/providers/amazon/aws/sensors/quicksight.py
+++ b/airflow/providers/amazon/aws/sensors/quicksight.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Sequence
 
 from deprecated import deprecated
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 
@@ -74,10 +74,7 @@ class QuickSightSensor(AwsBaseSensor[QuickSightHook]):
         self.log.info("QuickSight Status: %s", quicksight_ingestion_state)
         if quicksight_ingestion_state in self.errored_statuses:
             error = self.hook.get_error_info(None, self.data_set_id, 
self.ingestion_id)
-            message = f"The QuickSight Ingestion failed. Error info: {error}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"The QuickSight Ingestion failed. Error 
info: {error}")
         return quicksight_ingestion_state == self.success_status
 
     @cached_property
diff --git a/airflow/providers/amazon/aws/sensors/redshift_cluster.py 
b/airflow/providers/amazon/aws/sensors/redshift_cluster.py
index 7d2f4ba472..243c71e61f 100644
--- a/airflow/providers/amazon/aws/sensors/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/sensors/redshift_cluster.py
@@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any, Sequence
 from deprecated import deprecated
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 from airflow.providers.amazon.aws.triggers.redshift_cluster import 
RedshiftClusterTrigger
 from airflow.providers.amazon.aws.utils import validate_execute_complete_event
@@ -93,11 +93,7 @@ class RedshiftClusterSensor(BaseSensorOperator):
 
         status = event["status"]
         if status == "error":
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = f"{event['status']}: {event['message']}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"{event['status']}: {event['message']}")
         elif status == "success":
             self.log.info("%s completed successfully.", self.task_id)
             self.log.info("Cluster Identifier %s is in %s state", 
self.cluster_identifier, self.target_status)
diff --git a/airflow/providers/amazon/aws/sensors/s3.py 
b/airflow/providers/amazon/aws/sensors/s3.py
index 9c524494cd..2f32fff3d3 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -33,7 +33,7 @@ from airflow.providers.amazon.aws.utils import 
validate_execute_complete_event
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, 
S3KeyTrigger
 from airflow.sensors.base import BaseSensorOperator, poke_mode_only
@@ -219,9 +219,6 @@ class S3KeySensor(BaseSensorOperator):
             if not found_keys:
                 self._defer()
         elif event["status"] == "error":
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(event["message"])
             raise AirflowException(event["message"])
 
     @deprecated(reason="use `hook` property instead.", 
category=AirflowProviderDeprecationWarning)
@@ -342,14 +339,9 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
                 )
                 return False
 
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = (
-                f"Illegal behavior: objects were deleted in"
-                f" {os.path.join(self.bucket_name, self.prefix)} between 
pokes."
+            raise AirflowException(
+                f"Illegal behavior: objects were deleted in 
{os.path.join(self.bucket_name, self.prefix)} between pokes."
             )
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
 
         if self.last_activity_time:
             self.inactivity_seconds = int((datetime.now() - 
self.last_activity_time).total_seconds())
@@ -411,8 +403,5 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
         event = validate_execute_complete_event(event)
 
         if event and event["status"] == "error":
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            if self.soft_fail:
-                raise AirflowSkipException(event["message"])
             raise AirflowException(event["message"])
         return None
diff --git a/airflow/providers/amazon/aws/sensors/sagemaker.py 
b/airflow/providers/amazon/aws/sensors/sagemaker.py
index 97ac8ad483..b01e24cd5b 100644
--- a/airflow/providers/amazon/aws/sensors/sagemaker.py
+++ b/airflow/providers/amazon/aws/sensors/sagemaker.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Sequence
 
 from deprecated import deprecated
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.sagemaker import LogState, 
SageMakerHook
 from airflow.sensors.base import BaseSensorOperator
 
@@ -65,11 +65,9 @@ class SageMakerBaseSensor(BaseSensorOperator):
             return False
         if state in self.failed_states():
             failed_reason = self.get_failed_reason_from_response(response)
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Sagemaker {self.resource_type} failed for the 
following reason: {failed_reason}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(
+                f"Sagemaker {self.resource_type} failed for the following 
reason: {failed_reason}"
+            )
         return True
 
     def non_terminal_states(self) -> set[str]:
diff --git a/airflow/providers/amazon/aws/sensors/sqs.py 
b/airflow/providers/amazon/aws/sensors/sqs.py
index 657c6d9599..d04a8cf820 100644
--- a/airflow/providers/amazon/aws/sensors/sqs.py
+++ b/airflow/providers/amazon/aws/sensors/sqs.py
@@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Any, Collection, Sequence
 from deprecated import deprecated
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.sqs import SqsHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
@@ -160,11 +160,7 @@ class SqsSensor(AwsBaseSensor[SqsHook]):
         event = validate_execute_complete_event(event)
 
         if event["status"] != "success":
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Trigger error: event is {event}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Trigger error: event is {event}")
         context["ti"].xcom_push(key="messages", value=event["message_batch"])
 
     def poll_sqs(self, sqs_conn: BaseAwsConnection) -> Collection:
@@ -221,11 +217,7 @@ class SqsSensor(AwsBaseSensor[SqsHook]):
                 response = 
self.hook.conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)
 
                 if "Successful" not in response:
-                    # TODO: remove this if block when min_airflow_version is 
set to higher than 2.7.1
-                    error_message = f"Delete SQS Messages failed {response} 
for messages {messages}"
-                    if self.soft_fail:
-                        raise AirflowSkipException(error_message)
-                    raise AirflowException(error_message)
+                    raise AirflowException(f"Delete SQS Messages failed 
{response} for messages {messages}")
         if message_batch:
             context["ti"].xcom_push(key="messages", value=message_batch)
             return True
diff --git a/airflow/providers/amazon/aws/sensors/step_function.py 
b/airflow/providers/amazon/aws/sensors/step_function.py
index 5e0d3cfcf7..8af3bb6fe9 100644
--- a/airflow/providers/amazon/aws/sensors/step_function.py
+++ b/airflow/providers/amazon/aws/sensors/step_function.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Sequence
 
 from deprecated import deprecated
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -76,11 +76,7 @@ class 
StepFunctionExecutionSensor(AwsBaseSensor[StepFunctionHook]):
         output = json.loads(execution_status["output"]) if "output" in 
execution_status else None
 
         if state in self.FAILURE_STATES:
-            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
-            message = f"Step Function sensor failed. State Machine Output: 
{output}"
-            if self.soft_fail:
-                raise AirflowSkipException(message)
-            raise AirflowException(message)
+            raise AirflowException(f"Step Function sensor failed. State 
Machine Output: {output}")
 
         if state in self.INTERMEDIATE_STATES:
             return False
diff --git a/tests/providers/amazon/aws/sensors/test_athena.py 
b/tests/providers/amazon/aws/sensors/test_athena.py
index a973c76a38..c770ab7dde 100644
--- a/tests/providers/amazon/aws/sensors/test_athena.py
+++ b/tests/providers/amazon/aws/sensors/test_athena.py
@@ -21,7 +21,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.athena import AthenaHook
 from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
 
@@ -73,17 +73,10 @@ class TestAthenaSensor:
         mock_poll_query_status.side_effect = [state]
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", ["FAILED", "CANCELLED"])
-    def test_poke_failure_states(self, state, soft_fail, expected_exception, 
mock_poll_query_status):
+    def test_poke_failure_states(self, state, mock_poll_query_status):
         mock_poll_query_status.side_effect = [state]
-        sensor = AthenaSensor(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = AthenaSensor(**self.default_op_kwargs, aws_conn_id=None)
         message = "Athena sensor failed"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             sensor.poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_batch.py 
b/tests/providers/amazon/aws/sensors/test_batch.py
index 267aeb998f..a8ec1b926b 100644
--- a/tests/providers/amazon/aws/sensors/test_batch.py
+++ b/tests/providers/amazon/aws/sensors/test_batch.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException, 
TaskDeferred
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
 from airflow.providers.amazon.aws.sensors.batch import (
     BatchComputeEnvironmentSensor,
@@ -67,9 +67,7 @@ class TestBatchSensor:
     @mock.patch.object(BatchClientHook, "get_job_description")
     def test_poke_on_invalid_state(self, mock_get_job_description, 
batch_sensor: BatchSensor):
         mock_get_job_description.return_value = {"status": "INVALID"}
-        with pytest.raises(
-            AirflowException, match="Batch sensor failed. Unknown AWS Batch 
job status: INVALID"
-        ):
+        with pytest.raises(AirflowException, match="Batch sensor failed. AWS 
Batch job status: INVALID"):
             batch_sensor.poke({})
 
         mock_get_job_description.assert_called_once_with(JOB_ID)
@@ -100,23 +98,11 @@ class TestBatchSensor:
         with pytest.raises(AirflowException):
             deferrable_batch_sensor.execute_complete(context={}, 
event={"status": "failure"})
 
-    def test_execute_failure_in_deferrable_mode_with_soft_fail(self, 
deferrable_batch_sensor: BatchSensor):
-        """Tests that an AirflowSkipException is raised in case of error event 
and soft_fail is set to True"""
-        deferrable_batch_sensor.soft_fail = True
-        with pytest.raises(AirflowSkipException):
-            deferrable_batch_sensor.execute_complete(context={}, 
event={"status": "failure"})
-
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
     @pytest.mark.parametrize(
-        "state, error_message",
+        "state",
         (
-            (
-                BatchClientHook.FAILURE_STATE,
-                f"Batch sensor failed. AWS Batch job status: 
{BatchClientHook.FAILURE_STATE}",
-            ),
-            ("unknown_state", "Batch sensor failed. Unknown AWS Batch job 
status: unknown_state"),
+            BatchClientHook.FAILURE_STATE,
+            "unknown_state",
         ),
     )
     @mock.patch.object(BatchClientHook, "get_job_description")
@@ -125,13 +111,9 @@ class TestBatchSensor:
         mock_get_job_description,
         batch_sensor: BatchSensor,
         state,
-        error_message,
-        soft_fail,
-        expected_exception,
     ):
         mock_get_job_description.return_value = {"status": state}
-        batch_sensor.soft_fail = soft_fail
-        with pytest.raises(expected_exception, match=error_message):
+        with pytest.raises(AirflowException, match=f"Batch sensor failed. AWS 
Batch job status: {state}"):
             batch_sensor.poke({})
 
 
@@ -202,9 +184,6 @@ class TestBatchComputeEnvironmentSensor:
         )
         assert "AWS Batch compute environment failed" in str(ctx.value)
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
     @pytest.mark.parametrize(
         "compute_env, error_message",
         (
@@ -222,12 +201,9 @@ class TestBatchComputeEnvironmentSensor:
         batch_compute_environment_sensor: BatchComputeEnvironmentSensor,
         compute_env,
         error_message,
-        soft_fail,
-        expected_exception,
     ):
         mock_batch_client.describe_compute_environments.return_value = 
{"computeEnvironments": compute_env}
-        batch_compute_environment_sensor.soft_fail = soft_fail
-        with pytest.raises(expected_exception, match=error_message):
+        with pytest.raises(AirflowException, match=error_message):
             batch_compute_environment_sensor.poke({})
 
 
@@ -299,9 +275,6 @@ class TestBatchJobQueueSensor:
         )
         assert "AWS Batch job queue failed" in str(ctx.value)
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
     @pytest.mark.parametrize("job_queue", ([], [{"status": "UNKNOWN_STATUS"}]))
     @mock.patch.object(BatchClientHook, "client")
     def test_fail_poke(
@@ -309,12 +282,9 @@ class TestBatchJobQueueSensor:
         mock_batch_client,
         batch_job_queue_sensor: BatchJobQueueSensor,
         job_queue,
-        soft_fail,
-        expected_exception,
     ):
         mock_batch_client.describe_job_queues.return_value = {"jobQueues": 
job_queue}
         batch_job_queue_sensor.treat_non_existing_as_deleted = False
-        batch_job_queue_sensor.soft_fail = soft_fail
         message = "AWS Batch job queue"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             batch_job_queue_sensor.poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_bedrock.py 
b/tests/providers/amazon/aws/sensors/test_bedrock.py
index 4ed5319316..52151cd953 100644
--- a/tests/providers/amazon/aws/sensors/test_bedrock.py
+++ b/tests/providers/amazon/aws/sensors/test_bedrock.py
@@ -21,7 +21,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook, 
BedrockHook
 from airflow.providers.amazon.aws.sensors.bedrock import (
     BedrockCustomizeModelCompletedSensor,
@@ -75,19 +75,12 @@ class TestBedrockCustomizeModelCompletedSensor:
         mock_conn.get_model_customization_job.return_value = {"status": state}
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(BedrockHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.get_model_customization_job.return_value = {"status": state}
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
 
 
@@ -135,20 +128,13 @@ class TestBedrockProvisionModelThroughputCompletedSensor:
         mock_conn.get_provisioned_model_throughput.return_value = {"status": 
state}
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(BedrockHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.get_provisioned_model_throughput.return_value = {"status": 
state}
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
 
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
 
 
@@ -196,19 +182,12 @@ class TestBedrockKnowledgeBaseActiveSensor:
         mock_conn.get_knowledge_base.return_value = {"knowledgeBase": 
{"status": state}}
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(BedrockAgentHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.get_knowledge_base.return_value = {"knowledgeBase": 
{"status": state}}
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
 
 
@@ -258,17 +237,10 @@ class TestBedrockIngestionJobSensor:
         mock_conn.get_ingestion_job.return_value = {"ingestionJob": {"status": 
state}}
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(BedrockAgentHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.get_ingestion_job.return_value = {"ingestionJob": {"status": 
state}}
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_cloud_formation.py 
b/tests/providers/amazon/aws/sensors/test_cloud_formation.py
index 7edc252864..5149346371 100644
--- a/tests/providers/amazon/aws/sensors/test_cloud_formation.py
+++ b/tests/providers/amazon/aws/sensors/test_cloud_formation.py
@@ -23,7 +23,6 @@ import boto3
 import pytest
 from moto import mock_aws
 
-from airflow.exceptions import AirflowSkipException
 from airflow.providers.amazon.aws.sensors.cloud_formation import (
     CloudFormationCreateStackSensor,
     CloudFormationDeleteStackSensor,
@@ -76,17 +75,10 @@ class TestCloudFormationCreateStackSensor:
         op = CloudFormationCreateStackSensor(task_id="task", stack_name="foo")
         assert not op.poke({})
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, ValueError, id="non-soft-fail"),
-        ],
-    )
-    def test_poke_stack_in_unsuccessful_state(self, mocked_hook_client, 
soft_fail, expected_exception):
+    def test_poke_stack_in_unsuccessful_state(self, mocked_hook_client):
         mocked_hook_client.describe_stacks.return_value = {"Stacks": 
[{"StackStatus": "bar"}]}
-        op = CloudFormationCreateStackSensor(task_id="task", stack_name="foo", 
soft_fail=soft_fail)
-        with pytest.raises(expected_exception, match="Stack foo in bad state: 
bar"):
+        op = CloudFormationCreateStackSensor(task_id="task", stack_name="foo")
+        with pytest.raises(ValueError, match="Stack foo in bad state: bar"):
             op.poke({})
 
 
@@ -132,17 +124,10 @@ class TestCloudFormationDeleteStackSensor:
         op = CloudFormationDeleteStackSensor(task_id="task", stack_name="foo")
         assert not op.poke({})
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, ValueError, id="non-soft-fail"),
-        ],
-    )
-    def test_poke_stack_in_unsuccessful_state(self, mocked_hook_client, 
soft_fail, expected_exception):
+    def test_poke_stack_in_unsuccessful_state(self, mocked_hook_client):
         mocked_hook_client.describe_stacks.return_value = {"Stacks": 
[{"StackStatus": "bar"}]}
-        op = CloudFormationDeleteStackSensor(task_id="task", stack_name="foo", 
soft_fail=soft_fail)
-        with pytest.raises(expected_exception, match="Stack foo in bad state: 
bar"):
+        op = CloudFormationDeleteStackSensor(task_id="task", stack_name="foo")
+        with pytest.raises(ValueError, match="Stack foo in bad state: bar"):
             op.poke({})
 
     @mock_aws
diff --git a/tests/providers/amazon/aws/sensors/test_comprehend.py 
b/tests/providers/amazon/aws/sensors/test_comprehend.py
index 1c80ca5f79..20e2dc25dd 100644
--- a/tests/providers/amazon/aws/sensors/test_comprehend.py
+++ b/tests/providers/amazon/aws/sensors/test_comprehend.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook
 from airflow.providers.amazon.aws.sensors.comprehend import (
     ComprehendCreateDocumentClassifierCompletedSensor,
@@ -76,22 +76,15 @@ class 
TestComprehendStartPiiEntitiesDetectionJobCompletedSensor:
         }
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(ComprehendHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.describe_pii_entities_detection_job.return_value = {
             "PiiEntitiesDetectionJobProperties": {"JobStatus": state}
         }
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
 
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
 
 
@@ -176,20 +169,13 @@ class 
TestComprehendCreateDocumentClassifierCompletedSensor:
         }
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(ComprehendHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.describe_document_classifier.return_value = {
             "DocumentClassifierProperties": {"Status": state}
         }
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
 
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_dms.py 
b/tests/providers/amazon/aws/sensors/test_dms.py
index eb99949ea0..3ec2ffccc7 100644
--- a/tests/providers/amazon/aws/sensors/test_dms.py
+++ b/tests/providers/amazon/aws/sensors/test_dms.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
 from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
 
@@ -87,27 +87,13 @@ class TestDmsTaskCompletedSensor:
             "testing",
         ],
     )
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, AirflowException, id="non-soft-fail"),
-        ],
-    )
-    def test_poke_terminated_status(self, mocked_get_task_status, status, 
soft_fail, expected_exception):
+    def test_poke_terminated_status(self, mocked_get_task_status, status):
         mocked_get_task_status.return_value = status
         error_message = f"Unexpected status: {status}"
         with pytest.raises(AirflowException, match=error_message):
-            DmsTaskCompletedSensor(**self.default_op_kwargs, 
soft_fail=soft_fail).poke({})
+            DmsTaskCompletedSensor(**self.default_op_kwargs).poke({})
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, AirflowException, id="non-soft-fail"),
-        ],
-    )
-    def test_poke_none_status(self, mocked_get_task_status, soft_fail, 
expected_exception):
+    def test_poke_none_status(self, mocked_get_task_status):
         mocked_get_task_status.return_value = None
         with pytest.raises(AirflowException, match="task with ARN .* not 
found"):
-            DmsTaskCompletedSensor(**self.default_op_kwargs, 
soft_fail=soft_fail).poke({})
+            DmsTaskCompletedSensor(**self.default_op_kwargs).poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_ecs.py 
b/tests/providers/amazon/aws/sensors/test_ecs.py
index 43210ceeb2..dbbcef8c3d 100644
--- a/tests/providers/amazon/aws/sensors/test_ecs.py
+++ b/tests/providers/amazon/aws/sensors/test_ecs.py
@@ -24,7 +24,7 @@ import boto3
 import pytest
 from slugify import slugify
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.sensors.ecs import (
     EcsBaseSensor,
     EcsClusterStates,
@@ -34,7 +34,6 @@ from airflow.providers.amazon.aws.sensors.ecs import (
     EcsTaskDefinitionStateSensor,
     EcsTaskStates,
     EcsTaskStateSensor,
-    _check_failed,
 )
 from airflow.utils import timezone
 from airflow.utils.types import NOTSET
@@ -265,20 +264,3 @@ class TestEcsTaskStateSensor(EcsBaseTestCase):
             with pytest.raises(AirflowException, match="Terminal state 
reached"):
                 task.poke({})
             m.assert_called_once_with(cluster=TEST_CLUSTER_NAME, 
task=TEST_TASK_ARN)
-
-
[email protected](
-    "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-)
-def test_fail__check_failed(soft_fail, expected_exception):
-    current_state = "FAILED"
-    target_state = "SUCCESS"
-    failure_states = ["FAILED"]
-    message = f"Terminal state reached. Current state: {current_state}, 
Expected state: {target_state}"
-    with pytest.raises(expected_exception, match=message):
-        _check_failed(
-            current_state=current_state,
-            target_state=target_state,
-            failure_states=failure_states,
-            soft_fail=soft_fail,
-        )
diff --git 
a/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py 
b/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py
index c35d84e7fa..e52d2ab9e4 100644
--- a/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py
+++ b/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py
@@ -21,7 +21,7 @@ from unittest.mock import MagicMock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.sensors.emr import 
EmrServerlessApplicationSensor
 
 
@@ -75,16 +75,3 @@ class 
TestPokeRaisesAirflowException(TestEmrServerlessApplicationSensor):
 
         assert exception_msg == str(ctx.value)
         self.assert_get_application_was_called_once_with_app_id()
-
-
-class TestPokeRaisesAirflowSkipException(TestEmrServerlessApplicationSensor):
-    def 
test_when_state_is_failed_and_soft_fail_is_true_poke_should_raise_skip_exception(self):
-        self.sensor.soft_fail = True
-        self.set_get_application_return_value(
-            {"application": {"state": "STOPPED", "stateDetails": "mock 
stopped"}}
-        )
-        with pytest.raises(AirflowSkipException) as ctx:
-            self.sensor.poke(None)
-        assert "EMR Serverless application failed: mock stopped" == 
str(ctx.value)
-        self.assert_get_application_was_called_once_with_app_id()
-        self.sensor.soft_fail = False
diff --git a/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py 
b/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py
index 299efe3fd2..942abab677 100644
--- a/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py
+++ b/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py
@@ -21,7 +21,7 @@ from unittest.mock import MagicMock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.sensors.emr import EmrServerlessJobSensor
 
 
@@ -78,14 +78,3 @@ class 
TestPokeRaisesAirflowException(TestEmrServerlessJobSensor):
 
         assert exception_msg == str(ctx.value)
         self.assert_get_job_run_was_called_once_with_app_and_run_id()
-
-
-class TestPokeRaisesAirflowSkipException(TestEmrServerlessJobSensor):
-    def 
test_when_state_is_failed_and_soft_fail_is_true_poke_should_raise_skip_exception(self):
-        self.sensor.soft_fail = True
-        self.set_get_job_run_return_value({"jobRun": {"state": "FAILED", 
"stateDetails": "mock failed"}})
-        with pytest.raises(AirflowSkipException) as ctx:
-            self.sensor.poke(None)
-        assert "EMR Serverless job failed: mock failed" == str(ctx.value)
-        self.assert_get_job_run_was_called_once_with_app_and_run_id()
-        self.sensor.soft_fail = False
diff --git a/tests/providers/amazon/aws/sensors/test_glacier.py 
b/tests/providers/amazon/aws/sensors/test_glacier.py
index f46a6c4729..2c93cfb58e 100644
--- a/tests/providers/amazon/aws/sensors/test_glacier.py
+++ b/tests/providers/amazon/aws/sensors/test_glacier.py
@@ -21,7 +21,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.sensors.glacier import 
GlacierJobOperationSensor, JobStatus
 
 SUCCEEDED = "Succeeded"
@@ -77,19 +77,11 @@ class TestAmazonGlacierSensor:
         with pytest.raises(AirflowException, match="Sensor failed"):
             self.op.poke(None)
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
-    def test_fail_poke(self, soft_fail, expected_exception, 
mocked_describe_job):
-        self.op.soft_fail = soft_fail
+    def test_fail_poke(self, mocked_describe_job):
         response = {"Action": "some action", "StatusCode": "Failed"}
         message = f'Sensor failed. Job status: {response["Action"]}, code 
status: {response["StatusCode"]}'
         mocked_describe_job.return_value = response
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             self.op.poke(context={})
 
 
diff --git a/tests/providers/amazon/aws/sensors/test_glue.py 
b/tests/providers/amazon/aws/sensors/test_glue.py
index d179feb98e..2d4925a016 100644
--- a/tests/providers/amazon/aws/sensors/test_glue.py
+++ b/tests/providers/amazon/aws/sensors/test_glue.py
@@ -21,7 +21,7 @@ from unittest.mock import ANY
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
 from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
 
@@ -134,11 +134,8 @@ class TestGlueJobSensor:
             continuation_tokens=ANY,
         )
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
     
@mock.patch("airflow.providers.amazon.aws.hooks.glue.GlueJobHook.get_job_state")
-    def test_fail_poke(self, get_job_state, soft_fail, expected_exception):
+    def test_fail_poke(self, get_job_state):
         job_name = "job_name"
         job_run_id = "job_run_id"
         op = GlueJobSensor(
@@ -150,9 +147,8 @@ class TestGlueJobSensor:
             verbose=True,
         )
         op.verbose = False
-        op.soft_fail = soft_fail
         job_state = "FAILED"
         get_job_state.return_value = job_state
         job_error_message = "Exiting Job"
-        with pytest.raises(expected_exception, match=job_error_message):
+        with pytest.raises(AirflowException, match=job_error_message):
             op.poke(context={})
diff --git a/tests/providers/amazon/aws/sensors/test_glue_catalog_partition.py 
b/tests/providers/amazon/aws/sensors/test_glue_catalog_partition.py
index 767a6ca086..f8556092d1 100644
--- a/tests/providers/amazon/aws/sensors/test_glue_catalog_partition.py
+++ b/tests/providers/amazon/aws/sensors/test_glue_catalog_partition.py
@@ -22,7 +22,7 @@ from unittest import mock
 import pytest
 from moto import mock_aws
 
-from airflow.exceptions import AirflowException, AirflowSkipException, 
TaskDeferred
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.providers.amazon.aws.hooks.glue_catalog import GlueCatalogHook
 from airflow.providers.amazon.aws.sensors.glue_catalog_partition import 
GlueCatalogPartitionSensor
 
@@ -112,15 +112,11 @@ class TestGlueCatalogPartitionSensor:
         op.execute_complete(context={}, event=event)
         assert "Partition exists in the Glue Catalog" in caplog.messages
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
-    def test_fail_execute_complete(self, soft_fail, expected_exception):
+    def test_fail_execute_complete(self):
         op = GlueCatalogPartitionSensor(task_id=self.task_id, 
table_name="tbl", deferrable=True)
-        op.soft_fail = soft_fail
         event = {"status": "Failed"}
         message = f"Trigger error: event is {event}"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             op.execute_complete(context={}, event=event)
 
     def test_init(self):
diff --git a/tests/providers/amazon/aws/sensors/test_glue_crawler.py 
b/tests/providers/amazon/aws/sensors/test_glue_crawler.py
index 83b3795a1a..d5762c8ee6 100644
--- a/tests/providers/amazon/aws/sensors/test_glue_crawler.py
+++ b/tests/providers/amazon/aws/sensors/test_glue_crawler.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
 from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
 
@@ -53,16 +53,12 @@ class TestGlueCrawlerSensor:
         assert self.sensor.poke({}) is False
         mock_get_crawler.assert_called_once_with("aws_test_glue_crawler")
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
     
@mock.patch("airflow.providers.amazon.aws.hooks.glue_crawler.GlueCrawlerHook.get_crawler")
-    def test_fail_poke(self, get_crawler, soft_fail, expected_exception):
-        self.sensor.soft_fail = soft_fail
+    def test_fail_poke(self, get_crawler):
         crawler_status = "FAILED"
         get_crawler.return_value = {"State": "READY", "LastCrawl": {"Status": 
crawler_status}}
         message = f"Status: {crawler_status}"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             self.sensor.poke(context={})
 
     def test_base_aws_op_attributes(self):
diff --git a/tests/providers/amazon/aws/sensors/test_glue_data_quality.py 
b/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
index a37bc0b700..585487f22b 100644
--- a/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
+++ b/tests/providers/amazon/aws/sensors/test_glue_data_quality.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException, 
TaskDeferred
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook
 from airflow.providers.amazon.aws.sensors.glue import (
     GlueDataQualityRuleRecommendationRunSensor,
@@ -133,16 +133,9 @@ class TestGlueDataQualityRuleSetEvaluationRunSensor:
 
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(GlueDataQualityHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.get_data_quality_ruleset_evaluation_run.return_value = {
             "RunId": "12345",
             "Status": state,
@@ -150,11 +143,11 @@ class TestGlueDataQualityRuleSetEvaluationRunSensor:
             "ErrorString": "unknown error",
         }
 
-        sensor = self.SENSOR(**self.default_args, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_args, aws_conn_id=None)
 
         message = f"Error: AWS Glue data quality ruleset evaluation run RunId: 
12345 Run Status: {state}: unknown error"
 
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             sensor.poke({})
 
         
mock_conn.get_data_quality_ruleset_evaluation_run.assert_called_once_with(RunId="12345")
@@ -247,29 +240,22 @@ class TestGlueDataQualityRuleRecommendationRunSensor:
         )
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(GlueDataQualityHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.get_data_quality_rule_recommendation_run.return_value = {
             "RunId": "12345",
             "Status": state,
             "ErrorString": "unknown error",
         }
 
-        sensor = self.SENSOR(**self.default_args, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_args, aws_conn_id=None)
 
         message = (
             f"Error: AWS Glue data quality recommendation run RunId: 12345 Run 
Status: {state}: unknown error"
         )
 
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             sensor.poke({})
 
         
mock_conn.get_data_quality_rule_recommendation_run.assert_called_once_with(RunId="12345")
diff --git a/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py 
b/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
index 73a2cfdb77..b335931de2 100644
--- a/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
+++ b/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
 from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
     KinesisAnalyticsV2StartApplicationCompletedSensor,
@@ -78,24 +78,17 @@ class TestKinesisAnalyticsV2StartApplicationCompletedSensor:
         }
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.describe_application.return_value = {
             "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
         }
 
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
 
         with pytest.raises(
-            expected_exception, match="AWS Managed Service for Apache Flink 
application start failed"
+            AirflowException, match="AWS Managed Service for Apache Flink 
application start failed"
         ):
             sensor.poke({})
 
@@ -150,23 +143,16 @@ class 
TestKinesisAnalyticsV2StopApplicationCompletedSensor:
         }
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
     @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.describe_application.return_value = {
             "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
         }
 
-        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
 
         with pytest.raises(
-            expected_exception, match="AWS Managed Service for Apache Flink 
application stop failed"
+            AirflowException, match="AWS Managed Service for Apache Flink 
application stop failed"
         ):
             sensor.poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_lambda_function.py 
b/tests/providers/amazon/aws/sensors/test_lambda_function.py
index 8f537ead18..2e1fbe8981 100644
--- a/tests/providers/amazon/aws/sensors/test_lambda_function.py
+++ b/tests/providers/amazon/aws/sensors/test_lambda_function.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
 from airflow.providers.amazon.aws.sensors.lambda_function import 
LambdaFunctionStateSensor
 
@@ -87,17 +87,13 @@ class TestLambdaFunctionStateSensor:
                 FunctionName=FUNCTION_NAME,
             )
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
-    def test_fail_poke(self, soft_fail, expected_exception):
+    def test_fail_poke(self):
         sensor = LambdaFunctionStateSensor(
             task_id="test_sensor",
             function_name=FUNCTION_NAME,
         )
-        sensor.soft_fail = soft_fail
         message = "Lambda function state sensor failed because the Lambda is 
in a failed state"
         with 
mock.patch("airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn")
 as conn:
             conn.get_function.return_value = {"Configuration": {"State": 
"Failed"}}
-            with pytest.raises(expected_exception, match=message):
+            with pytest.raises(AirflowException, match=message):
                 sensor.poke(context={})
diff --git a/tests/providers/amazon/aws/sensors/test_opensearch_serverless.py 
b/tests/providers/amazon/aws/sensors/test_opensearch_serverless.py
index 3b7474aeac..5543934468 100644
--- a/tests/providers/amazon/aws/sensors/test_opensearch_serverless.py
+++ b/tests/providers/amazon/aws/sensors/test_opensearch_serverless.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.opensearch_serverless import 
OpenSearchServerlessHook
 from airflow.providers.amazon.aws.sensors.opensearch_serverless import (
     OpenSearchServerlessCollectionActiveSensor,
@@ -95,19 +95,10 @@ class TestOpenSearchServerlessCollectionActiveSensor:
         mock_conn.batch_get_collection.return_value = {"collectionDetails": 
[{"status": state}]}
         assert self.sensor.poke({}) is False
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(False, AirflowException, id="not-soft-fail"),
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-        ],
-    )
     @pytest.mark.parametrize("state", 
list(OpenSearchServerlessCollectionActiveSensor.FAILURE_STATES))
     @mock.patch.object(OpenSearchServerlessHook, "conn")
-    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+    def test_poke_failure_states(self, mock_conn, state):
         mock_conn.batch_get_collection.return_value = {"collectionDetails": 
[{"status": state}]}
-        sensor = OpenSearchServerlessCollectionActiveSensor(
-            **self.default_op_kwargs, aws_conn_id=None, soft_fail=soft_fail
-        )
-        with pytest.raises(expected_exception, match=sensor.FAILURE_MESSAGE):
+        sensor = 
OpenSearchServerlessCollectionActiveSensor(**self.default_op_kwargs, 
aws_conn_id=None)
+        with pytest.raises(AirflowException, match=sensor.FAILURE_MESSAGE):
             sensor.poke({})
diff --git a/tests/providers/amazon/aws/sensors/test_quicksight.py 
b/tests/providers/amazon/aws/sensors/test_quicksight.py
index 9eb9a49587..46890a69cb 100644
--- a/tests/providers/amazon/aws/sensors/test_quicksight.py
+++ b/tests/providers/amazon/aws/sensors/test_quicksight.py
@@ -21,7 +21,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
 from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor
 
@@ -88,20 +88,11 @@ class TestQuickSightSensor:
         mocked_get_status.assert_called_once_with(None, DATA_SET_ID, 
INGESTION_ID)
 
     @pytest.mark.parametrize("status", ["FAILED", "CANCELLED"])
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, AirflowException, id="non-soft-fail"),
-        ],
-    )
-    def test_poke_terminated_status(
-        self, status, soft_fail, expected_exception, mocked_get_status, 
mocked_get_error_info
-    ):
+    def test_poke_terminated_status(self, status, mocked_get_status, 
mocked_get_error_info):
         mocked_get_status.return_value = status
         mocked_get_error_info.return_value = "something bad happen"
-        with pytest.raises(expected_exception, match="Error info: something 
bad happen"):
-            QuickSightSensor(**self.default_op_kwargs, 
soft_fail=soft_fail).poke({})
+        with pytest.raises(AirflowException, match="Error info: something bad 
happen"):
+            QuickSightSensor(**self.default_op_kwargs).poke({})
         mocked_get_status.assert_called_once_with(None, DATA_SET_ID, 
INGESTION_ID)
         mocked_get_error_info.assert_called_once_with(None, DATA_SET_ID, 
INGESTION_ID)
 
diff --git a/tests/providers/amazon/aws/sensors/test_s3.py 
b/tests/providers/amazon/aws/sensors/test_s3.py
index 2d9ee9d52e..3c5606dcbe 100644
--- a/tests/providers/amazon/aws/sensors/test_s3.py
+++ b/tests/providers/amazon/aws/sensors/test_s3.py
@@ -24,7 +24,7 @@ import pytest
 import time_machine
 from moto import mock_aws
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.models.variable import Variable
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -282,18 +282,14 @@ class TestS3KeySensor:
             sensor.execute_complete(context={}, event={"status": "running", 
"files": [{"Size": 10}]}) is None
         )
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
-    def test_fail_execute_complete(self, soft_fail, expected_exception):
+    def test_fail_execute_complete(self):
         op = S3KeySensor(
             task_id="s3_key_sensor",
             bucket_key=["s3://test_bucket/file*", "s3://test_bucket/*.zip"],
             wildcard_match=True,
         )
-        op.soft_fail = soft_fail
         message = "error"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             op.execute_complete(context={}, event={"status": "error", 
"message": message})
 
     @mock_aws
@@ -524,25 +520,17 @@ class TestS3KeysUnchangedSensor:
         time_machine.coordinates.shift(10)
         assert self.sensor.poke(dict())
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
-    def test_fail_is_keys_unchanged(self, soft_fail, expected_exception):
+    def test_fail_is_keys_unchanged(self):
         op = S3KeysUnchangedSensor(task_id="sensor", 
bucket_name="test-bucket", prefix="test-prefix/path")
-        op.soft_fail = soft_fail
         op.previous_objects = {"1", "2", "3"}
         current_objects = {"1", "2"}
         op.allow_delete = False
         message = "Illegal behavior: objects were deleted in"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             op.is_keys_unchanged(current_objects=current_objects)
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
-    def test_fail_execute_complete(self, soft_fail, expected_exception):
+    def test_fail_execute_complete(self):
         op = S3KeysUnchangedSensor(task_id="sensor", 
bucket_name="test-bucket", prefix="test-prefix/path")
-        op.soft_fail = soft_fail
         message = "test message"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             op.execute_complete(context={}, event={"status": "error", 
"message": message})
diff --git a/tests/providers/amazon/aws/sensors/test_sagemaker_base.py 
b/tests/providers/amazon/aws/sensors/test_sagemaker_base.py
index bc324473dc..c8a3ade2a1 100644
--- a/tests/providers/amazon/aws/sensors/test_sagemaker_base.py
+++ b/tests/providers/amazon/aws/sensors/test_sagemaker_base.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.sensors.sagemaker import SageMakerBaseSensor
 
 
@@ -110,10 +110,7 @@ class TestSagemakerBaseSensor:
         with pytest.raises(AirflowException):
             sensor.poke(None)
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
-    )
-    def test_fail_poke(self, soft_fail, expected_exception):
+    def test_fail_poke(self):
         resource_type = "job"
 
         class SageMakerBaseSensorSubclass(SageMakerBaseSensor):
@@ -132,10 +129,9 @@ class TestSagemakerBaseSensor:
         sensor = SageMakerBaseSensorSubclass(
             task_id="test_task", poke_interval=2, aws_conn_id="aws_test", 
resource_type=resource_type
         )
-        sensor.soft_fail = soft_fail
         message = (
             f"Sagemaker {resource_type} failed for the following reason:"
             f" {sensor.get_failed_reason_from_response({})}"
         )
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             sensor.poke(context={})
diff --git a/tests/providers/amazon/aws/sensors/test_sqs.py 
b/tests/providers/amazon/aws/sensors/test_sqs.py
index 0cddcfb36d..457195313c 100644
--- a/tests/providers/amazon/aws/sensors/test_sqs.py
+++ b/tests/providers/amazon/aws/sensors/test_sqs.py
@@ -23,7 +23,7 @@ from unittest import mock
 import pytest
 from moto import mock_aws
 
-from airflow.exceptions import AirflowException, AirflowSkipException, 
TaskDeferred
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.providers.amazon.aws.hooks.sqs import SqsHook
 from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
 
@@ -418,31 +418,17 @@ class TestSqsSensor:
         with pytest.raises(TaskDeferred):
             sensor.execute({})
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, AirflowException, id="non-soft-fail"),
-        ],
-    )
-    def test_fail_execute_complete(self, soft_fail, expected_exception):
-        sensor = SqsSensor(**self.default_op_kwargs, deferrable=True, 
soft_fail=soft_fail)
+    def test_fail_execute_complete(self):
+        sensor = SqsSensor(**self.default_op_kwargs, deferrable=True)
         event = {"status": "failed"}
         message = f"Trigger error: event is {event}"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             sensor.execute_complete(context={}, event=event)
 
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, AirflowException, id="non-soft-fail"),
-        ],
-    )
     @mock.patch("airflow.providers.amazon.aws.sensors.sqs.SqsSensor.poll_sqs")
     @mock.patch("airflow.providers.amazon.aws.sensors.sqs.process_response")
     @mock.patch("airflow.providers.amazon.aws.hooks.sqs.SqsHook.conn")
-    def test_fail_poke(self, mocked_client, process_response, poll_sqs, 
soft_fail, expected_exception):
+    def test_fail_poke(self, mocked_client, process_response, poll_sqs):
         response = "error message"
         messages = [{"MessageId": "1", "ReceiptHandle": "test"}]
         poll_sqs.return_value = response
@@ -450,6 +436,6 @@ class TestSqsSensor:
         mocked_client.delete_message_batch.return_value = response
         error_message = f"Delete SQS Messages failed {response} for messages"
 
-        sensor = SqsSensor(**self.default_op_kwargs, soft_fail=soft_fail)
-        with pytest.raises(expected_exception, match=error_message):
+        sensor = SqsSensor(**self.default_op_kwargs)
+        with pytest.raises(AirflowException, match=error_message):
             sensor.poke(context={})
diff --git a/tests/providers/amazon/aws/sensors/test_step_function.py 
b/tests/providers/amazon/aws/sensors/test_step_function.py
index 878691dc1c..5289fb053e 100644
--- a/tests/providers/amazon/aws/sensors/test_step_function.py
+++ b/tests/providers/amazon/aws/sensors/test_step_function.py
@@ -22,7 +22,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.sensors.step_function import 
StepFunctionExecutionSensor
 
 TASK_ID = "step_function_execution_sensor"
@@ -78,19 +78,10 @@ class TestStepFunctionExecutionSensor:
 
     @mock.patch.object(StepFunctionExecutionSensor, "hook")
     @pytest.mark.parametrize("status", 
StepFunctionExecutionSensor.FAILURE_STATES)
-    @pytest.mark.parametrize(
-        "soft_fail, expected_exception",
-        [
-            pytest.param(True, AirflowSkipException, id="soft-fail"),
-            pytest.param(False, AirflowException, id="non-soft-fail"),
-        ],
-    )
-    def test_failure(self, mocked_hook, status, soft_fail, expected_exception, 
mocked_context):
+    def test_failure(self, mocked_hook, status, mocked_context):
         output = {"test": "test"}
         mocked_hook.describe_execution.return_value = {"status": status, 
"output": json.dumps(output)}
-        sensor = StepFunctionExecutionSensor(
-            task_id=TASK_ID, execution_arn=EXECUTION_ARN, aws_conn_id=None, 
soft_fail=soft_fail
-        )
+        sensor = StepFunctionExecutionSensor(task_id=TASK_ID, 
execution_arn=EXECUTION_ARN, aws_conn_id=None)
         message = f"Step Function sensor failed. State Machine Output: 
{output}"
-        with pytest.raises(expected_exception, match=message):
+        with pytest.raises(AirflowException, match=message):
             sensor.poke(mocked_context)


Reply via email to