This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 518926a5a60 More robust handling of `BaseHook.get_connection`'s `CONNECTION_NOT_FOUND` Task SDK exception (#52838) 518926a5a60 is described below commit 518926a5a600be51bf1f00c346067f4981a362e7 Author: Ramit Kataria <ramit...@amazon.com> AuthorDate: Fri Jul 4 11:21:22 2025 -0700 More robust handling of `BaseHook.get_connection`'s `CONNECTION_NOT_FOUND` Task SDK exception (#52838) After #51873, the base hook's `get_connection` exception for when connection is not found was changed and was no longer being caught by `AwsGenericHook`. This change fixes that by using a robust approach. At some point, we should probably make the connection getter raise exceptions in a more consistent manner, maybe once we fully switch over to Task SDK. --- .../airflow/providers/amazon/aws/hooks/base_aws.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py index 18e9d396230..524af9722da 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py @@ -82,7 +82,7 @@ BaseAwsConnection = TypeVar("BaseAwsConnection", bound=Union[BaseClient, Service if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.exceptions import AirflowRuntimeError + from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType if TYPE_CHECKING: from aiobotocore.session import AioSession @@ -623,19 +623,16 @@ class AwsGenericHook(BaseHook, Generic[BaseAwsConnection]): """Get the Airflow Connection object and wrap it in helper (cached).""" connection = None if self.aws_conn_id: - possible_exceptions: tuple[type[Exception], ...] - - if AIRFLOW_V_3_0_PLUS: - possible_exceptions = (AirflowNotFoundException, AirflowRuntimeError) - else: - possible_exceptions = (AirflowNotFoundException,) - try: connection = self.get_connection(self.aws_conn_id) - except possible_exceptions as e: - if isinstance( - e, AirflowNotFoundException - ) or f"Connection with ID {self.aws_conn_id} not found" in str(e): + except Exception as e: + not_found_exc_via_core = isinstance(e, AirflowNotFoundException) + not_found_exc_via_task_sdk = ( + AIRFLOW_V_3_0_PLUS + and isinstance(e, AirflowRuntimeError) + and e.error.error == ErrorType.CONNECTION_NOT_FOUND + ) + if not_found_exc_via_core or not_found_exc_via_task_sdk: self.log.warning( "Unable to find AWS Connection ID '%s', switching to empty.", self.aws_conn_id )