dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##########
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
                 self.log.info("Topic %s deleted.", self.topic_name)
             else:
                 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+    """
+    Implement request-reply pattern using Azure Service Bus.
+
+    Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+    Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+    Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+    Steps are:
+        1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+            sent or there will be a race condition where the reply message 
could be sent before the
+            subscription is created. By default, a UUID is used for the unique 
ID and this should be
+            reasonably unique over the life of the universe.
+        2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+            created in #1.
+        3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+            (b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+        4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+        5. Remove the subscription on the reply topic.
+
+    The caller must pass in a generator function to create the request message 
body (request_body_generator)
+    from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+    callback could either detect errors and abort processing by throwing an 
exception or could process the
+    message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+    the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+    The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+    message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+    although the current implementation expects all replies to be sent through 
a topic. The reply message
+    should have the correlation ID set to the message ID of the request 
message.
+
+    :param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+        a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+    :param request_body_generator: A method to generate the request message 
body from the context.
+    :param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+        a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+    :param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+        message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+        the default behavior should be used.
+    :param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+        the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+    :param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+        bus message and the context as parameters and can raise an exception 
to abort processing or insert
+        values into the XCOM for downstream tasks to use.
+    :param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+        `azure_service_bus_default`,
+    """
+
+    REPLY_SUBSCRIPTION_PREFIX = "reply-"
+    REPLY_RULE_SUFFIX = "-rule"
+
+    def __init__(
+        self,
+        *,
+        request_queue_name: str,
+        request_body_generator: Callable[[Context], str],
+        reply_topic_name: str,
+        reply_correlation_id: str | None = None,
+        max_wait_time: float = 60,
+        reply_message_callback: MessageCallback | None = None,
+        azure_service_bus_conn_id: str = "azure_service_bus_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.request_queue_name = request_queue_name
+        self.request_body_generator = request_body_generator
+        self.reply_topic_name = reply_topic_name
+        self.reply_correlation_id = reply_correlation_id
+        self.max_wait_time = max_wait_time
+        self.reply_message_callback = reply_message_callback
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+        if not self.reply_correlation_id:
+            self.reply_correlation_id = str(uuid4())
+        self.subscription_name = self.REPLY_SUBSCRIPTION_PREFIX + 
self.reply_correlation_id
+
+    def execute(self, context: Context) -> None:
+        """Implement the request-reply pattern using existing hooks."""
+        self._validate_params()
+        admin_hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+        self._create_reply_subscription_for_correlation_id(admin_hook, context)
+
+        message_hook = 
MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+        try:
+            # send the request message
+            self._send_request_message(message_hook, context)
+
+            # Wait for and receive the reply message
+            message_hook.receive_subscription_message(
+                self.reply_topic_name,
+                self.subscription_name,
+                context,
+                max_message_count=1,
+                max_wait_time=self.max_wait_time,
+                message_callback=self.reply_message_callback,
+            )
+        finally:
+            # Remove the subscription on the reply topic
+            self._remove_reply_subscription(admin_hook)
+
+    def _send_request_message(self, message_hook: MessageHook, context: 
Context) -> None:

Review Comment:
   All those protected methods which have the MessageHook or AdminClientHook as 
parameter should become a public method of the corresponding hook.  Most of the 
logic should always be located in the hooks, not in the operator, see the 
operator as some kind of facilitator within DAG's which delegates the heavy 
lifting to the hooks, so that the hooks on their turn can also be easily 
(re)used within PythonOperators without the need to rewrite the same logic as 
in the operators.
   
   So in this case _send_request_message method should be part of MessageHook.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to