dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974
########## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ########## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): + """ + Implement request-reply pattern using Azure Service Bus. + + Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service + Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, + Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + + Steps are: + 1. Generate a unique ID for the message. The subscription needs to exist before the request message is + sent or there will be a race condition where the reply message could be sent before the + subscription is created. By default, a UUID is used for the unique ID and this should be + reasonably unique over the life of the universe. + 2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID + created in #1. + 3. Send the message to the request queue with (a) the reply-to property set to the reply topic, + (b) the reply type property set to topic, and (c) the message ID set to the unique ID. + 4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. + 5. Remove the subscription on the reply topic. + + The caller must pass in a generator function to create the request message body (request_body_generator) + from the context and an optional callback can process the reply message (reply_message_callback). This + callback could either detect errors and abort processing by throwing an exception or could process the + message body and add information into XComs for downstream tasks to use. The remote service can send back + the message in any format it chooses. The supplied callback should be able to handle the message format. + + The remote service should reply to the topic or queue specified in the reply_to property of the request + message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type + although the current implementation expects all replies to be sent through a topic. The reply message + should have the correlation ID set to the message ID of the request message. + + :param request_queue_name: Name of the queue to send the request to. This queue must be reachable from + a connection created from the connection name specified in the param `azure_service_bus_conn_id`. + :param request_body_generator: A method to generate the request message body from the context. + :param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from + a connection created from the connection name specified in the param "azure_service_bus_conn_id". + :param reply_correlation_id: a string to use to correlate the request and reply. This will also be the + message ID of the request message. If not specified, a UUID will be generated and used. Generally, + the default behavior should be used. + :param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of + the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. + :param reply_message_callback: An optional callback to handle the response message. This takes the service + bus message and the context as parameters and can raise an exception to abort processing or insert + values into the XCOM for downstream tasks to use. + :param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to + `azure_service_bus_default`, + """ + + REPLY_SUBSCRIPTION_PREFIX = "reply-" + REPLY_RULE_SUFFIX = "-rule" + + def __init__( + self, + *, + request_queue_name: str, + request_body_generator: Callable[[Context], str], + reply_topic_name: str, + reply_correlation_id: str | None = None, + max_wait_time: float = 60, + reply_message_callback: MessageCallback | None = None, + azure_service_bus_conn_id: str = "azure_service_bus_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.request_queue_name = request_queue_name + self.request_body_generator = request_body_generator + self.reply_topic_name = reply_topic_name + self.reply_correlation_id = reply_correlation_id + self.max_wait_time = max_wait_time + self.reply_message_callback = reply_message_callback + self.azure_service_bus_conn_id = azure_service_bus_conn_id + + if not self.reply_correlation_id: + self.reply_correlation_id = str(uuid4()) + self.subscription_name = self.REPLY_SUBSCRIPTION_PREFIX + self.reply_correlation_id + + def execute(self, context: Context) -> None: + """Implement the request-reply pattern using existing hooks.""" + self._validate_params() + admin_hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + self._create_reply_subscription_for_correlation_id(admin_hook, context) + + message_hook = MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + try: + # send the request message + self._send_request_message(message_hook, context) + + # Wait for and receive the reply message + message_hook.receive_subscription_message( + self.reply_topic_name, + self.subscription_name, + context, + max_message_count=1, + max_wait_time=self.max_wait_time, + message_callback=self.reply_message_callback, + ) + finally: + # Remove the subscription on the reply topic + self._remove_reply_subscription(admin_hook) + + def _send_request_message(self, message_hook: MessageHook, context: Context) -> None: Review Comment: All those protected methods which have the MessageHook or AdminClientHook as parameter should become a public method of the corresponding hook. Most of the logic should always be located in the hooks, not in the operator, see the operator as some kind of facilitator within DAG's which delegates the heavy lifting to the hooks, so that the hooks on their turn can also be easily (re)used within PythonOperators without the need to rewrite the same logic as in the operators. So in this case _send_request_message method should be part of MessageHook. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org