This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 47b72056c4 SQSPublishOperator should allow sending messages to a FIFO Queue (#25171) 47b72056c4 is described below commit 47b72056c46931aef09d63d6d80fbdd3d9128b09 Author: Daniel Barrundia Gonzalez <dbarrund...@gmail.com> AuthorDate: Thu Jul 21 10:25:03 2022 -0600 SQSPublishOperator should allow sending messages to a FIFO Queue (#25171) --- airflow/providers/amazon/aws/hooks/sqs.py | 19 +++++++++----- airflow/providers/amazon/aws/operators/sqs.py | 13 +++++++++- tests/providers/amazon/aws/operators/test_sqs.py | 33 ++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/sqs.py b/airflow/providers/amazon/aws/hooks/sqs.py index b94756f63a..d3bcd837c9 100644 --- a/airflow/providers/amazon/aws/hooks/sqs.py +++ b/airflow/providers/amazon/aws/hooks/sqs.py @@ -58,6 +58,7 @@ class SqsHook(AwsBaseHook): message_body: str, delay_seconds: int = 0, message_attributes: Optional[Dict] = None, + message_group_id: Optional[str] = None, ) -> Dict: """ Send message to the queue @@ -67,17 +68,23 @@ class SqsHook(AwsBaseHook): :param delay_seconds: seconds to delay the message :param message_attributes: additional attributes for the message (default: None) For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message` + :param message_group_id: This applies only to FIFO (first-in-first-out) queues. (default: None) + For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message` :return: dict with the information about the message sent For details of the returned value see :py:meth:`botocore.client.SQS.send_message` :rtype: dict """ - return self.get_conn().send_message( - QueueUrl=queue_url, - MessageBody=message_body, - DelaySeconds=delay_seconds, - MessageAttributes=message_attributes or {}, - ) + params = { + 'QueueUrl': queue_url, + 'MessageBody': message_body, + 'DelaySeconds': delay_seconds, + 'MessageAttributes': message_attributes or {}, + } + if message_group_id: + params['MessageGroupId'] = message_group_id + + return self.get_conn().send_message(**params) class SQSHook(SqsHook): diff --git a/airflow/providers/amazon/aws/operators/sqs.py b/airflow/providers/amazon/aws/operators/sqs.py index 6eff54134a..c9874c4d1e 100644 --- a/airflow/providers/amazon/aws/operators/sqs.py +++ b/airflow/providers/amazon/aws/operators/sqs.py @@ -39,10 +39,18 @@ class SqsPublishOperator(BaseOperator): :param message_attributes: additional attributes for the message (default: None) For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message` :param delay_seconds: message delay (templated) (default: 1 second) + :param message_group_id: This parameter applies only to FIFO (first-in-first-out) queues. (default: None) + For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message` :param aws_conn_id: AWS connection id (default: aws_default) """ - template_fields: Sequence[str] = ('sqs_queue', 'message_content', 'delay_seconds', 'message_attributes') + template_fields: Sequence[str] = ( + 'sqs_queue', + 'message_content', + 'delay_seconds', + 'message_attributes', + 'message_group_id', + ) template_fields_renderers = {'message_attributes': 'json'} ui_color = '#6ad3fa' @@ -53,6 +61,7 @@ class SqsPublishOperator(BaseOperator): message_content: str, message_attributes: Optional[dict] = None, delay_seconds: int = 0, + message_group_id: Optional[str] = None, aws_conn_id: str = 'aws_default', **kwargs, ): @@ -62,6 +71,7 @@ class SqsPublishOperator(BaseOperator): self.message_content = message_content self.delay_seconds = delay_seconds self.message_attributes = message_attributes or {} + self.message_group_id = message_group_id def execute(self, context: 'Context'): """ @@ -79,6 +89,7 @@ class SqsPublishOperator(BaseOperator): message_body=self.message_content, delay_seconds=self.delay_seconds, message_attributes=self.message_attributes, + message_group_id=self.message_group_id, ) self.log.info('send_message result: %s', result) diff --git a/tests/providers/amazon/aws/operators/test_sqs.py b/tests/providers/amazon/aws/operators/test_sqs.py index 915fca9bfb..1ce152bbf1 100644 --- a/tests/providers/amazon/aws/operators/test_sqs.py +++ b/tests/providers/amazon/aws/operators/test_sqs.py @@ -20,6 +20,8 @@ import unittest from unittest.mock import MagicMock +import pytest +from botocore.exceptions import ClientError from moto import mock_sqs from airflow.models.dag import DAG @@ -32,6 +34,9 @@ DEFAULT_DATE = timezone.datetime(2019, 1, 1) QUEUE_NAME = 'test-queue' QUEUE_URL = f'https://{QUEUE_NAME}' +FIFO_QUEUE_NAME = 'test-queue.fifo' +FIFO_QUEUE_URL = f'https://{FIFO_QUEUE_NAME}' + class TestSqsPublishOperator(unittest.TestCase): def setUp(self): @@ -66,3 +71,31 @@ class TestSqsPublishOperator(unittest.TestCase): context_calls = [] assert self.mock_context['ti'].method_calls == context_calls, "context call should be same" + + @mock_sqs + def test_execute_failure_fifo_queue(self): + self.operator.sqs_queue = FIFO_QUEUE_URL + self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'}) + with pytest.raises(ClientError) as ctx: + self.operator.execute(self.mock_context) + err_msg = ( + "An error occurred (MissingParameter) when calling the SendMessage operation: The request must " + "contain the parameter MessageGroupId." + ) + assert err_msg == str(ctx.value) + + @mock_sqs + def test_execute_success_fifo_queue(self): + self.operator.sqs_queue = FIFO_QUEUE_URL + self.operator.message_group_id = "abc" + self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'}) + result = self.operator.execute(self.mock_context) + assert 'MD5OfMessageBody' in result + assert 'MessageId' in result + message = self.sqs_hook.get_conn().receive_message( + QueueUrl=FIFO_QUEUE_URL, AttributeNames=['MessageGroupId'] + ) + assert len(message['Messages']) == 1 + assert message['Messages'][0]['MessageId'] == result['MessageId'] + assert message['Messages'][0]['Body'] == 'hello' + assert message['Messages'][0]['Attributes']['MessageGroupId'] == 'abc'