This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 58e5d02302c Improve debuggability of SQS, Lambda, EC2, and RDS hooks 
(#64661)
58e5d02302c is described below

commit 58e5d02302c2d13885e8a151489276e899f5d315
Author: Shivam Rastogi <[email protected]>
AuthorDate: Mon Apr 6 15:16:25 2026 -0700

    Improve debuggability of SQS, Lambda, EC2, and RDS hooks (#64661)
    
    - Add debug logging before/after API calls in all four hooks,
      following the pattern from the Airbyte provider (#51503) and
      DynamoDB hook (#64629)
    - Fix `raise e` → bare `raise` in RDS hook (get_export_task_state,
      get_event_subscription_state) to avoid misleading extra traceback
      frames
---
 .../src/airflow/providers/amazon/aws/hooks/ec2.py  | 25 ++++++++++--
 .../providers/amazon/aws/hooks/lambda_function.py  | 29 +++++++++++++-
 .../src/airflow/providers/amazon/aws/hooks/rds.py  | 46 +++++++++++++++-------
 .../src/airflow/providers/amazon/aws/hooks/sqs.py  | 19 +++++++--
 4 files changed, 96 insertions(+), 23 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py
index 36bc6cf3a29..71cf3abb4da 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py
@@ -89,6 +89,7 @@ class EC2Hook(AwsBaseHook):
         :param filters: List of filters to specify instances to get
         :return: Instance object
         """
+        self.log.debug("Getting EC2 instance %s with filters %s", instance_id, 
filters)
         if self._api_type == "client_type":
             return self.get_instances(filters=filters, 
instance_ids=[instance_id])[0]
 
@@ -104,7 +105,9 @@ class EC2Hook(AwsBaseHook):
         """
         self.log.info("Stopping instances: %s", instance_ids)
 
-        return self.conn.stop_instances(InstanceIds=instance_ids)
+        result = self.conn.stop_instances(InstanceIds=instance_ids)
+        self.log.debug("stop_instances response: %s", 
result.get("StoppingInstances"))
+        return result
 
     @only_client_type
     def start_instances(self, instance_ids: list) -> dict:
@@ -116,7 +119,9 @@ class EC2Hook(AwsBaseHook):
         """
         self.log.info("Starting instances: %s", instance_ids)
 
-        return self.conn.start_instances(InstanceIds=instance_ids)
+        result = self.conn.start_instances(InstanceIds=instance_ids)
+        self.log.debug("start_instances response: %s", 
result.get("StartingInstances"))
+        return result
 
     @only_client_type
     def terminate_instances(self, instance_ids: list) -> dict:
@@ -128,7 +133,9 @@ class EC2Hook(AwsBaseHook):
         """
         self.log.info("Terminating instances: %s", instance_ids)
 
-        return self.conn.terminate_instances(InstanceIds=instance_ids)
+        result = self.conn.terminate_instances(InstanceIds=instance_ids)
+        self.log.debug("terminate_instances response: %s", 
result.get("TerminatingInstances"))
+        return result
 
     @only_client_type
     def describe_instances(self, filters: list | None = None, instance_ids: 
list | None = None):
@@ -173,9 +180,12 @@ class EC2Hook(AwsBaseHook):
         return [instance["InstanceId"] for instance in 
self.get_instances(filters=filters)]
 
     async def get_instance_state_async(self, instance_id: str) -> str:
+        self.log.debug("Getting instance state (async) for %s", instance_id)
         async with await self.get_async_conn() as client:
             response = await 
client.describe_instances(InstanceIds=[instance_id])
-            return response["Reservations"][0]["Instances"][0]["State"]["Name"]
+            state = 
response["Reservations"][0]["Instances"][0]["State"]["Name"]
+        self.log.debug("Instance %s state (async): %s", instance_id, state)
+        return state
 
     def get_instance_state(self, instance_id: str) -> str:
         """
@@ -200,8 +210,15 @@ class EC2Hook(AwsBaseHook):
         :return: None
         """
         instance_state = self.get_instance_state(instance_id=instance_id)
+        self.log.debug(
+            "Waiting for instance %s to reach state '%s', current state: '%s'",
+            instance_id,
+            target_state,
+            instance_state,
+        )
 
         while instance_state != target_state:
+            self.log.debug("Sleeping %ss before next state check", 
check_interval)
             time.sleep(check_interval)
             instance_state = self.get_instance_state(instance_id=instance_id)
 
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/hooks/lambda_function.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/lambda_function.py
index dea2a4272b2..605d1151355 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/lambda_function.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/lambda_function.py
@@ -80,7 +80,19 @@ class LambdaHook(AwsBaseHook):
             "Payload": payload,
             "Qualifier": qualifier,
         }
-        return self.conn.invoke(**trim_none_values(invoke_args))
+        self.log.debug(
+            "Invoking Lambda function %s with invocation type %s, qualifier 
%s",
+            function_name,
+            invocation_type,
+            qualifier,
+        )
+        response = self.conn.invoke(**trim_none_values(invoke_args))
+        self.log.debug(
+            "Lambda invoke response: StatusCode=%s, FunctionError=%s",
+            response.get("StatusCode"),
+            response.get("FunctionError"),
+        )
+        return response
 
     def create_lambda(
         self,
@@ -192,7 +204,20 @@ class LambdaHook(AwsBaseHook):
             "SnapStart": snap_start,
             "LoggingConfig": logging_config,
         }
-        return 
self.conn.create_function(**trim_none_values(create_function_args))
+        self.log.debug(
+            "Creating Lambda function %s with runtime %s, handler %s, package 
type %s",
+            function_name,
+            runtime,
+            handler,
+            package_type,
+        )
+        response = 
self.conn.create_function(**trim_none_values(create_function_args))
+        self.log.debug(
+            "Lambda function created: ARN=%s, State=%s",
+            response.get("FunctionArn"),
+            response.get("State"),
+        )
+        return response
 
     @staticmethod
     @return_on_error(None)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/rds.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/rds.py
index 29124b2685c..caabc4bb3e0 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/rds.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/rds.py
@@ -61,11 +61,14 @@ class RdsHook(AwsGenericHook["RDSClient"]):
         :return: Returns the status of the DB snapshot as a string (eg. 
"available")
         :raises AirflowNotFoundException: If the DB instance snapshot does not 
exist.
         """
+        self.log.debug("Retrieving state for DB snapshot %s", snapshot_id)
         try:
             response = 
self.conn.describe_db_snapshots(DBSnapshotIdentifier=snapshot_id)
         except self.conn.exceptions.DBSnapshotNotFoundFault as e:
-            raise AirflowNotFoundException(e)
-        return response["DBSnapshots"][0]["Status"].lower()
+            raise AirflowNotFoundException(e) from e
+        state = response["DBSnapshots"][0]["Status"].lower()
+        self.log.debug("DB snapshot %s state: %s", snapshot_id, state)
+        return state
 
     def wait_for_db_snapshot_state(
         self, snapshot_id: str, target_state: str, check_interval: int = 30, 
max_attempts: int = 40
@@ -107,11 +110,14 @@ class RdsHook(AwsGenericHook["RDSClient"]):
         :return: Returns the status of the DB cluster snapshot as a string 
(eg. "available")
         :raises AirflowNotFoundException: If the DB cluster snapshot does not 
exist.
         """
+        self.log.debug("Retrieving state for DB cluster snapshot %s", 
snapshot_id)
         try:
             response = 
self.conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier=snapshot_id)
         except self.conn.exceptions.DBClusterSnapshotNotFoundFault as e:
-            raise AirflowNotFoundException(e)
-        return response["DBClusterSnapshots"][0]["Status"].lower()
+            raise AirflowNotFoundException(e) from e
+        state = response["DBClusterSnapshots"][0]["Status"].lower()
+        self.log.debug("DB cluster snapshot %s state: %s", snapshot_id, state)
+        return state
 
     def wait_for_db_cluster_snapshot_state(
         self, snapshot_id: str, target_state: str, check_interval: int = 30, 
max_attempts: int = 40
@@ -153,13 +159,16 @@ class RdsHook(AwsGenericHook["RDSClient"]):
         :return: Returns the status of the snapshot export task as a string 
(eg. "canceled")
         :raises AirflowNotFoundException: If the export task does not exist.
         """
+        self.log.debug("Retrieving state for export task %s", export_task_id)
         try:
             response = 
self.conn.describe_export_tasks(ExportTaskIdentifier=export_task_id)
         except self.conn.exceptions.ClientError as e:
             if e.response["Error"]["Code"] in ("ExportTaskNotFound", 
"ExportTaskNotFoundFault"):
-                raise AirflowNotFoundException(e)
-            raise e
-        return response["ExportTasks"][0]["Status"].lower()
+                raise AirflowNotFoundException(e) from e
+            raise
+        state = response["ExportTasks"][0]["Status"].lower()
+        self.log.debug("Export task %s state: %s", export_task_id, state)
+        return state
 
     def wait_for_export_task_state(
         self, export_task_id: str, target_state: str, check_interval: int = 
30, max_attempts: int = 40
@@ -194,13 +203,16 @@ class RdsHook(AwsGenericHook["RDSClient"]):
         :return: Returns the status of the event subscription as a string (eg. 
"active")
         :raises AirflowNotFoundException: If the event subscription does not 
exist.
         """
+        self.log.debug("Retrieving state for event subscription %s", 
subscription_name)
         try:
             response = 
self.conn.describe_event_subscriptions(SubscriptionName=subscription_name)
         except self.conn.exceptions.ClientError as e:
             if e.response["Error"]["Code"] in ("SubscriptionNotFoundFault", 
"SubscriptionNotFound"):
-                raise AirflowNotFoundException(e)
-            raise e
-        return response["EventSubscriptionsList"][0]["Status"].lower()
+                raise AirflowNotFoundException(e) from e
+            raise
+        state = response["EventSubscriptionsList"][0]["Status"].lower()
+        self.log.debug("Event subscription %s state: %s", subscription_name, 
state)
+        return state
 
     def wait_for_event_subscription_state(
         self, subscription_name: str, target_state: str, check_interval: int = 
30, max_attempts: int = 40
@@ -235,11 +247,14 @@ class RdsHook(AwsGenericHook["RDSClient"]):
         :return: Returns the status of the DB instance as a string (eg. 
"available")
         :raises AirflowNotFoundException: If the DB instance does not exist.
         """
+        self.log.debug("Retrieving state for DB instance %s", db_instance_id)
         try:
             response = 
self.conn.describe_db_instances(DBInstanceIdentifier=db_instance_id)
         except self.conn.exceptions.DBInstanceNotFoundFault as e:
-            raise AirflowNotFoundException(e)
-        return response["DBInstances"][0]["DBInstanceStatus"].lower()
+            raise AirflowNotFoundException(e) from e
+        state = response["DBInstances"][0]["DBInstanceStatus"].lower()
+        self.log.debug("DB instance %s state: %s", db_instance_id, state)
+        return state
 
     def wait_for_db_instance_state(
         self, db_instance_id: str, target_state: str, check_interval: int = 
30, max_attempts: int = 40
@@ -286,11 +301,14 @@ class RdsHook(AwsGenericHook["RDSClient"]):
         :return: Returns the status of the DB cluster as a string (eg. 
"available")
         :raises AirflowNotFoundException: If the DB cluster does not exist.
         """
+        self.log.debug("Retrieving state for DB cluster %s", db_cluster_id)
         try:
             response = 
self.conn.describe_db_clusters(DBClusterIdentifier=db_cluster_id)
         except self.conn.exceptions.DBClusterNotFoundFault as e:
-            raise AirflowNotFoundException(e)
-        return response["DBClusters"][0]["Status"].lower()
+            raise AirflowNotFoundException(e) from e
+        state = response["DBClusters"][0]["Status"].lower()
+        self.log.debug("DB cluster %s state: %s", db_cluster_id, state)
+        return state
 
     def wait_for_db_cluster_state(
         self, db_cluster_id: str, target_state: str, check_interval: int = 30, 
max_attempts: int = 40
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/sqs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/sqs.py
index 1f4e8a36b2a..f813e6cee5a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/sqs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/sqs.py
@@ -51,7 +51,10 @@ class SqsHook(AwsBaseHook):
         :param attributes: additional attributes for the queue (default: None)
         :return: dict with the information about the queue.
         """
-        return self.get_conn().create_queue(QueueName=queue_name, 
Attributes=attributes or {})
+        self.log.debug("Creating SQS queue %s with attributes %s", queue_name, 
attributes)
+        result = self.get_conn().create_queue(QueueName=queue_name, 
Attributes=attributes or {})
+        self.log.debug("Created SQS queue %s, response: %s", queue_name, 
result.get("QueueUrl"))
+        return result
 
     @staticmethod
     def _build_msg_params(
@@ -104,7 +107,10 @@ class SqsHook(AwsBaseHook):
             message_group_id=message_group_id,
             message_deduplication_id=message_deduplication_id,
         )
-        return self.get_conn().send_message(**params)
+        self.log.debug("Sending message to SQS queue %s with delay %ds", 
queue_url, delay_seconds)
+        result = self.get_conn().send_message(**params)
+        self.log.debug("Message sent to %s, MessageId: %s", queue_url, 
result.get("MessageId"))
+        return result
 
     async def asend_message(
         self,
@@ -138,5 +144,12 @@ class SqsHook(AwsBaseHook):
             message_deduplication_id=message_deduplication_id,
         )
 
+        self.log.debug(
+            "Sending message (async) to SQS queue %s with delay %ds",
+            queue_url,
+            delay_seconds,
+        )
         async with await self.get_async_conn() as async_conn:
-            return await async_conn.send_message(**params)
+            result = await async_conn.send_message(**params)
+        self.log.debug("Message sent (async) to %s, MessageId: %s", queue_url, 
result.get("MessageId"))
+        return result

Reply via email to