uranusjr commented on a change in pull request #16904:
URL: https://github.com/apache/airflow/pull/16904#discussion_r669238131



##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -69,31 +89,48 @@ def poke(self, context):
 
         self.log.info('SQSSensor checking for message on queue: %s', 
self.sqs_queue)
 
-        messages = sqs_conn.receive_message(
-            QueueUrl=self.sqs_queue,
-            MaxNumberOfMessages=self.max_messages,
-            WaitTimeSeconds=self.wait_time_seconds,
-        )
+        receive_message_kwargs = {
+            'QueueUrl': self.sqs_queue,
+            'MaxNumberOfMessages': self.max_messages,
+            'WaitTimeSeconds': self.wait_time_seconds,
+        }
+        if self.visibility_timeout is not None:
+            receive_message_kwargs['VisibilityTimeout'] = 
self.visibility_timeout
+
+        response = sqs_conn.receive_message(**receive_message_kwargs)
+
+        if "Messages" not in response:
+            return False
 
-        self.log.info("received message %s", str(messages))
+        messages = response['Messages']
+        num_messages = len(messages)
+        self.log.info("received %s messages", str(num_messages))

Review comment:
       ```suggestion
           self.log.info("received %d messages", num_messages)
   ```
   
   The string conversion is redundant since the logger can accept int directly; 
passing the int also ensures the most context is retained, which can be useful 
for people using custom log handlers and formatters. (I know this was done in 
existing code; it is wrong.)
   
   Same for other logs below.

##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -69,31 +89,48 @@ def poke(self, context):
 
         self.log.info('SQSSensor checking for message on queue: %s', 
self.sqs_queue)
 
-        messages = sqs_conn.receive_message(
-            QueueUrl=self.sqs_queue,
-            MaxNumberOfMessages=self.max_messages,
-            WaitTimeSeconds=self.wait_time_seconds,
-        )
+        receive_message_kwargs = {
+            'QueueUrl': self.sqs_queue,
+            'MaxNumberOfMessages': self.max_messages,
+            'WaitTimeSeconds': self.wait_time_seconds,
+        }
+        if self.visibility_timeout is not None:
+            receive_message_kwargs['VisibilityTimeout'] = 
self.visibility_timeout
+
+        response = sqs_conn.receive_message(**receive_message_kwargs)
+
+        if "Messages" not in response:
+            return False
 
-        self.log.info("received message %s", str(messages))
+        messages = response['Messages']
+        num_messages = len(messages)
+        self.log.info("received %s messages", str(num_messages))
 
-        if 'Messages' in messages and messages['Messages']:
-            entries = [
-                {'Id': message['MessageId'], 'ReceiptHandle': 
message['ReceiptHandle']}
-                for message in messages['Messages']
-            ]
+        if num_messages == 0:

Review comment:
       ```suggestion
           if not num_messages:
   ```
   
   Same for other checks below.

##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -69,31 +89,48 @@ def poke(self, context):
 
         self.log.info('SQSSensor checking for message on queue: %s', 
self.sqs_queue)
 
-        messages = sqs_conn.receive_message(
-            QueueUrl=self.sqs_queue,
-            MaxNumberOfMessages=self.max_messages,
-            WaitTimeSeconds=self.wait_time_seconds,
-        )
+        receive_message_kwargs = {
+            'QueueUrl': self.sqs_queue,
+            'MaxNumberOfMessages': self.max_messages,
+            'WaitTimeSeconds': self.wait_time_seconds,
+        }
+        if self.visibility_timeout is not None:
+            receive_message_kwargs['VisibilityTimeout'] = 
self.visibility_timeout
+
+        response = sqs_conn.receive_message(**receive_message_kwargs)
+
+        if "Messages" not in response:
+            return False
 
-        self.log.info("received message %s", str(messages))
+        messages = response['Messages']
+        num_messages = len(messages)
+        self.log.info("received %s messages", str(num_messages))
 
-        if 'Messages' in messages and messages['Messages']:
-            entries = [
-                {'Id': message['MessageId'], 'ReceiptHandle': 
message['ReceiptHandle']}
-                for message in messages['Messages']
-            ]
+        if num_messages == 0:
+            return False
 
-            result = sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue, 
Entries=entries)
+        if self.message_filtering:
+            messages = self.filter_messages(messages)
+            num_messages = len(messages)
+            self.log.info("filtered %s messages", str(num_messages))

Review comment:
       This message is quite ambiguous. Is the count indicating how many 
message are filtered (i.e. dropped), or how many are left after the filter is 
applied?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to