Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2643616065 Need to rewrite to use a dedicated response queue because there is a race condition between adding the subscription and modifying the filter. The alternatives are to discard any messages before sending the request or to fix the python SDK for ASB but seems simpler to use a queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
github-actions[bot] commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2632467029 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1881167025 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is Review Comment: Ah, you mean under the top-level docs folder. It never occurred to me that the providers would be documented outside the top-level providers folder. I'm happy to move things over. I'll try to get to that later this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1880807495 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is Review Comment: That seems fair. I'm not sure where to move it to though. I don't see documentation outside of the docstrings. Sorry if I'm missing something obviousnot really a python dev. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2536947290 It seems to me if a message is sent from an airflow DAG then the DAG author probably wants a message back at some point to confirm completion, check for errors, et cetera. To the best of my knowledge, the logic in this PR implements the standard design pattern for doing that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
eladkal commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1880484135 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is Review Comment: Most of what you write here needs to be in the docs not in the class doc string -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2536375618 When I was working on this new operator I thought about moving some of those down into the hooks so I could reuse. I'll go ahead and put in a PR to address that and then update this PR to use those. It will reduce duplication and be a good thing. I'll try and do it this afternoon...we'll see, I have day-job work to do suddenly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2533943215 > Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339 Good catch, indeed it would be better that this code resides within the hook, the hook should take care of the connection handling and exposes the logic within a public method which on it's turn is called from the operator, that way the same operation can also be executed from the hook within a PythonOperator. But don't worry, there are still a lot of operators written that way unfortunately, but if we clean up every time we need to modify an operator, we will get there one day :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2532375809 Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2532331911 Thank you for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1878295123 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1878295838 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1878295123 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1878294037 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1878291301 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1878288754 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877454268 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877459842 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877454268 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877457974 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
dabla commented on code in PR #44675: URL: https://github.com/apache/airflow/pull/44675#discussion_r1877454268 ## providers/src/airflow/providers/microsoft/azure/operators/asb.py: ## @@ -650,3 +653,189 @@ def execute(self, context: Context) -> None: self.log.info("Topic %s deleted.", self.topic_name) else: self.log.info("Topic %s does not exist.", self.topic_name) + + +class AzureServiceBusRequestReplyOperator(BaseOperator): +""" +Implement request-reply pattern using Azure Service Bus. + +Send a message to an Azure Service Bus Queue and receive a reply by correlation id from an Azure Service +Bus Topic. This implements the Request-Reply pattern from Enterprise Integration Patterns, Hohpe, Woolf, +Addison-Wesley, 2003: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html + +Steps are: +1. Generate a unique ID for the message. The subscription needs to exist before the request message is +sent or there will be a race condition where the reply message could be sent before the +subscription is created. By default, a UUID is used for the unique ID and this should be +reasonably unique over the life of the universe. +2. Create a subscription to the reply topic for messages where the correlation ID equals the unique ID +created in #1. +3. Send the message to the request queue with (a) the reply-to property set to the reply topic, +(b) the reply type property set to topic, and (c) the message ID set to the unique ID. +4. Wait for a reply message on the reply topic with the correlation ID set to the unique ID. +5. Remove the subscription on the reply topic. + +The caller must pass in a generator function to create the request message body (request_body_generator) +from the context and an optional callback can process the reply message (reply_message_callback). This +callback could either detect errors and abort processing by throwing an exception or could process the +message body and add information into XComs for downstream tasks to use. The remote service can send back +the message in any format it chooses. The supplied callback should be able to handle the message format. + +The remote service should reply to the topic or queue specified in the reply_to property of the request +message. The remote service can tell if the reply should go to a topic or a queue based on the reply_type +although the current implementation expects all replies to be sent through a topic. The reply message +should have the correlation ID set to the message ID of the request message. + +:param request_queue_name: Name of the queue to send the request to. This queue must be reachable from +a connection created from the connection name specified in the param `azure_service_bus_conn_id`. +:param request_body_generator: A method to generate the request message body from the context. +:param reply_topic_name: Name of the topic to send the reply to. This topic must be reachable from +a connection created from the connection name specified in the param "azure_service_bus_conn_id". +:param reply_correlation_id: a string to use to correlate the request and reply. This will also be the +message ID of the request message. If not specified, a UUID will be generated and used. Generally, +the default behavior should be used. +:param max_wait_time: maximum wait for a reply in seconds. This should be set to some small multiple of +the expected processing time. Perhaps 3x the expected processing time. Default is 60 seconds. +:param reply_message_callback: An optional callback to handle the response message. This takes the service +bus message and the context as parameters and can raise an exception to abort processing or insert +values into the XCOM for downstream tasks to use. +:param azure_service_bus_conn_id: ID of the airflow connection to the Azure Service Bus. It defaults to +`azure_service_bus_default`, +""" + +REPLY_SUBSCRIPTION_PREFIX = "reply-" +REPLY_RULE_SUFFIX = "-rule" + +def __init__( +self, +*, +request_queue_name: str, +request_body_generator: Callable[[Context], str], +reply_topic_name: str, +reply_correlation_id: str | None = None, +max_wait_time: float = 60, +reply_message_callback: MessageCallback | None = None, +azure_service_bus_conn_id: str = "azure_service_bus_default", +**kwargs, +) -> None: +super().__init__(**kwargs) +self.request_queue_name = request_queue_name +self.request_body_generator = request_body_generator +self.reply_topic_name = reply_topic_name +self.reply_correlation_id = reply_correlation_id +
Re: [PR] Add request-reply operator to Microsoft Azure provider [airflow]
perry2of5 commented on PR #44675: URL: https://github.com/apache/airflow/pull/44675#issuecomment-2518981249 This is ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org