o-nikolas commented on code in PR #57753:
URL: https://github.com/apache/airflow/pull/57753#discussion_r2492396406


##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/ssm.py:
##########
@@ -89,14 +94,47 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
             for instance_id in instance_ids:
                 self.waiter_args["InstanceId"] = instance_id
-                await async_wait(
-                    waiter,
-                    self.waiter_delay,
-                    self.attempts,
-                    self.waiter_args,
-                    self.failure_message,
-                    self.status_message,
-                    self.status_queries,
-                )
+                try:
+                    await async_wait(
+                        waiter,
+                        self.waiter_delay,
+                        self.attempts,
+                        self.waiter_args,
+                        self.failure_message,
+                        self.status_message,
+                        self.status_queries,
+                    )
+                except Exception:

Review Comment:
   It looks like you do this pattern three separate times with lots of 
duplicated log strings, state sets/tuples, etc. Can this logic be extracted out 
into a helper method that's defined once and used three times?



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py:
##########
@@ -132,14 +143,36 @@ def execute(self, context: Context):
 
             instance_ids = response["Command"]["InstanceIds"]
             for instance_id in instance_ids:
-                waiter.wait(
-                    CommandId=command_id,
-                    InstanceId=instance_id,
-                    WaiterConfig={
-                        "Delay": self.waiter_delay,
-                        "MaxAttempts": self.waiter_max_attempts,
-                    },
-                )
+                try:
+                    waiter.wait(
+                        CommandId=command_id,
+                        InstanceId=instance_id,
+                        WaiterConfig={
+                            "Delay": self.waiter_delay,
+                            "MaxAttempts": self.waiter_max_attempts,
+                        },
+                    )
+                except WaiterError:
+                    if not self.fail_on_nonzero_exit:
+                        # Enhanced mode: distinguish between AWS-level and 
command-level failures
+                        invocation = 
self.hook.get_command_invocation(command_id, instance_id)
+                        status = invocation.get("Status", "")
+
+                        # AWS-level failures should always raise
+                        if status in ("Cancelled", "TimedOut"):
+                            self.log.error("Command failed with AWS-level 
error: %s", status)

Review Comment:
   We probably don't need this? It fails today without a log when these 
situations happen.



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py:
##########
@@ -132,14 +143,36 @@ def execute(self, context: Context):
 
             instance_ids = response["Command"]["InstanceIds"]
             for instance_id in instance_ids:
-                waiter.wait(
-                    CommandId=command_id,
-                    InstanceId=instance_id,
-                    WaiterConfig={
-                        "Delay": self.waiter_delay,
-                        "MaxAttempts": self.waiter_max_attempts,
-                    },
-                )
+                try:
+                    waiter.wait(
+                        CommandId=command_id,
+                        InstanceId=instance_id,
+                        WaiterConfig={
+                            "Delay": self.waiter_delay,
+                            "MaxAttempts": self.waiter_max_attempts,
+                        },
+                    )
+                except WaiterError:
+                    if not self.fail_on_nonzero_exit:
+                        # Enhanced mode: distinguish between AWS-level and 
command-level failures
+                        invocation = 
self.hook.get_command_invocation(command_id, instance_id)
+                        status = invocation.get("Status", "")
+
+                        # AWS-level failures should always raise
+                        if status in ("Cancelled", "TimedOut"):

Review Comment:
   Are these the only "aws-level" failures? Always cautious of static lists 
like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to