uranusjr commented on a change in pull request #16904:
URL: https://github.com/apache/airflow/pull/16904#discussion_r672007981
##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -102,3 +152,39 @@ def get_hook(self) -> SQSHook:
self.hook = SQSHook(aws_conn_id=self.aws_conn_id)
return self.hook
+
+ def filter_messages(self, messages):
+ if self.message_filtering == 'literal':
+ return self.filter_messages_literal(messages)
+ if self.message_filtering == 'jsonpath':
+ return self.filter_messages_jsonpath(messages)
+ else:
+ raise NotImplementedError('Override this method to define custom
filters')
+
+ def filter_messages_literal(self, messages):
+ filtered_messages = []
+ if self.message_filtering_match_values is None:
+ raise Exception('message_filtering_match_values must be specified
for literal matching')
Review comment:
This should be checked in `__init__`, and shoul raise a `TypeError`
instead.
##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -48,13 +68,26 @@ def __init__(
aws_conn_id: str = 'aws_default',
max_messages: int = 5,
wait_time_seconds: int = 1,
+ visibility_timeout: Optional[int] = None,
+ message_filtering: Optional[str] = None,
+ message_filtering_match_values: Optional[Any] = None,
+ message_filtering_config: Optional[Any] = None,
**kwargs,
):
super().__init__(**kwargs)
self.sqs_queue = sqs_queue
self.aws_conn_id = aws_conn_id
self.max_messages = max_messages
self.wait_time_seconds = wait_time_seconds
+ self.visibility_timeout = visibility_timeout
+
+ self.message_filtering = message_filtering
+ if message_filtering_match_values is not None:
+ if not isinstance(message_filtering_match_values, list):
+ message_filtering_match_values =
[message_filtering_match_values]
+ self.message_filtering_match_values = message_filtering_match_values
Review comment:
I wonder `message_filtering_match_values` should be a `set` instead,
this can improve performance quite a bit since the check is in a pretty tight
loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]