Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2025-02-07 Thread via GitHub


perry2of5 commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2643616065

   Need to rewrite to use a dedicated response queue because there is a race 
condition between adding the subscription and modifying the filter. The 
alternatives are to discard any messages before sending the request or to fix 
the python SDK for ASB but seems simpler to use a queue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2025-02-03 Thread via GitHub


github-actions[bot] commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2632467029

   This pull request has been automatically marked as stale because it has not 
had recent activity. It will be closed in 5 days if no further activity occurs. 
Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-11 Thread via GitHub


perry2of5 commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1881167025


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is

Review Comment:
   Ah, you mean under the top-level docs folder. It never occurred to me that 
the providers would be documented outside the top-level providers folder. I'm 
happy to move things over. I'll try to get to that later this week.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-11 Thread via GitHub


perry2of5 commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1880807495


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is

Review Comment:
   That seems fair. I'm not sure where to move it to though. I don't see 
documentation outside of the docstrings. Sorry if I'm missing something 
obviousnot really a python dev.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-11 Thread via GitHub


perry2of5 commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2536947290

   It seems to me if a message is sent from an airflow DAG then the DAG author 
probably wants a message back at some point to confirm completion, check for 
errors, et cetera. To the best of my knowledge, the logic in this PR implements 
the standard design pattern for doing that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-11 Thread via GitHub


eladkal commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1880484135


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is

Review Comment:
   Most of what you write here needs to be in the docs not in the class doc 
string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-11 Thread via GitHub


perry2of5 commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2536375618

   When I was working on this new operator I thought about moving some of those 
down into the hooks so I could reuse. I'll go ahead and put in a PR to address 
that and then update this PR to use those. It will reduce duplication and be a 
good thing. I'll try and do it this afternoon...we'll see, I have day-job work 
to do suddenly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2533943215

   > Just want to check, right now several of the existing operators get a 
handle to the connection and call the Microsoft Azure library directly. These 
should really be refactored down into the hook as well, right? For example 
this: 
https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339
   
   Good catch, indeed it would be better that this code resides within the 
hook, the hook should take care of the connection handling and exposes the 
logic within a public method which on it's turn is called from the operator, 
that way the same operation can also be executed from the hook within a 
PythonOperator.
   
   But don't worry, there are still a lot of operators written that way 
unfortunately, but if we clean up every time we need to modify an operator, we 
will get there one day :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


perry2of5 commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2532375809

   Just want to check, right now several of the existing operators get a handle 
to the connection and call the Microsoft Azure library directly. These should 
really be refactored down into the hook as well, right? For example this: 
https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


perry2of5 commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2532331911

   Thank you for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1878295123


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1878295838


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1878295123


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1878294037


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1878291301


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1878288754


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877454268


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-10 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-09 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-09 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-09 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-09 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877454268


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-09 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-09 Thread via GitHub


dabla commented on code in PR #44675:
URL: https://github.com/apache/airflow/pull/44675#discussion_r1877454268


##
providers/src/airflow/providers/microsoft/azure/operators/asb.py:
##
@@ -650,3 +653,189 @@ def execute(self, context: Context) -> None:
 self.log.info("Topic %s deleted.", self.topic_name)
 else:
 self.log.info("Topic %s does not exist.", self.topic_name)
+
+
+class AzureServiceBusRequestReplyOperator(BaseOperator):
+"""
+Implement request-reply pattern using Azure Service Bus.
+
+Send a message to an Azure Service Bus Queue and receive a reply by 
correlation id from an Azure Service
+Bus Topic. This implements the Request-Reply pattern from Enterprise 
Integration Patterns, Hohpe, Woolf,
+Addison-Wesley, 2003: 
https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
+
+Steps are:
+1. Generate a unique ID for the message. The subscription needs to 
exist before the request message is
+sent or there will be a race condition where the reply message 
could be sent before the
+subscription is created. By default, a UUID is used for the unique 
ID and this should be
+reasonably unique over the life of the universe.
+2. Create a subscription to the reply topic for messages where the 
correlation ID equals the unique ID
+created in #1.
+3. Send the message to the request queue with (a) the reply-to 
property set to the reply topic,
+(b) the reply type property set to topic, and (c) the message ID 
set to the unique ID.
+4. Wait for a reply message on the reply topic with the correlation ID 
set to the unique ID.
+5. Remove the subscription on the reply topic.
+
+The caller must pass in a generator function to create the request message 
body (request_body_generator)
+from the context and an optional callback can  process the reply message 
(reply_message_callback). This
+callback could either detect errors and abort processing by throwing an 
exception or could process the
+message body and add information into XComs for downstream tasks to use. 
The remote service can send back
+the message in any format it chooses. The supplied callback should be able 
to handle the message format.
+
+The remote service should reply to the topic or queue specified in the 
reply_to property of the request
+message. The remote service can tell if the reply should go to a topic or 
a queue based on the reply_type
+although the current implementation expects all replies to be sent through 
a topic. The reply message
+should have the correlation ID set to the message ID of the request 
message.
+
+:param request_queue_name: Name of the queue to send the request to. This 
queue must be reachable from
+a connection created from the connection name specified in the param 
`azure_service_bus_conn_id`.
+:param request_body_generator: A method to generate the request message 
body from the context.
+:param reply_topic_name: Name of the topic to send the reply to. This 
topic must be reachable from
+a connection created from the connection name specified in the param 
"azure_service_bus_conn_id".
+:param reply_correlation_id: a string to use to correlate the request and 
reply. This will also be the
+message ID of the request message. If not specified, a UUID will be 
generated and used. Generally,
+the default behavior should be used.
+:param max_wait_time: maximum wait for a reply in seconds. This should be 
set to some small multiple of
+the expected processing time. Perhaps 3x the expected processing time. 
Default is 60 seconds.
+:param reply_message_callback: An optional callback to handle the response 
message. This takes the service
+bus message and the context as parameters and can raise an exception 
to abort processing or insert
+values into the XCOM for downstream tasks to use.
+:param azure_service_bus_conn_id: ID of the airflow connection to the 
Azure Service Bus. It defaults to
+`azure_service_bus_default`,
+"""
+
+REPLY_SUBSCRIPTION_PREFIX = "reply-"
+REPLY_RULE_SUFFIX = "-rule"
+
+def __init__(
+self,
+*,
+request_queue_name: str,
+request_body_generator: Callable[[Context], str],
+reply_topic_name: str,
+reply_correlation_id: str | None = None,
+max_wait_time: float = 60,
+reply_message_callback: MessageCallback | None = None,
+azure_service_bus_conn_id: str = "azure_service_bus_default",
+**kwargs,
+) -> None:
+super().__init__(**kwargs)
+self.request_queue_name = request_queue_name
+self.request_body_generator = request_body_generator
+self.reply_topic_name = reply_topic_name
+self.reply_correlation_id = reply_correlation_id
+  

Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]

2024-12-04 Thread via GitHub


perry2of5 commented on PR #44675:
URL: https://github.com/apache/airflow/pull/44675#issuecomment-2518981249

   This is ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org