perry2of5 opened a new pull request, #44675:
URL: https://github.com/apache/airflow/pull/44675

   This PR adds a request-reply operator to implement the design pattern from 
Enterprise Integration Patterns, Hohpe, Woolf,
      Addison-Wesley, 2003
   
   In particular, this means one could:
   a) Create a service bus queue and topic for a batch process
   b) set up an auto-scaling Azure Container Job listening to an Azure Service 
Bus queue for messages
   c) create a DAG using the request-reply operator to start the Azure 
Container Job and capture the reply when it finishes.
   
   Potential improvements:
   * have the operator background itself while waiting for a reply. No need to 
tie up a worker thread while a remote process runs
   * Provide more parameters to control the subscription to the reply queue. 
Right now it deregisters itself if not used for 6 hours and drops messages 
after 1 hour. This seems reasonable to me since this subscription should only 
exist for the life of the operator, but more configuration might help some use 
case I haven't thought of.
   
   
   A working Azure Container App Job can be built using the 
scripts/event-job-aca.zsh in the repo 
https://github.com/perry2of5/http-file-rtrvr
   
   A working DAG is provided below:
   
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.utils.context import Context
   from airflow.operators.python import PythonOperator
   from airflow.providers.microsoft.azure.operators.asb import 
AzureServiceBusRequestReplyOperator
   from azure.servicebus import ServiceBusMessage
   import json
   
   dag = DAG('test-http-req-reply-dag', description='Test sending message to 
HTTP download service',
             schedule_interval='0 12 * * *',
             start_date=datetime(2017, 3, 20), catchup=False)
   
   
   def print_hello():
       return 'Hello world from first Airflow DAG!'
   
   
   def body_generator(context: Context):
       # Define the request body here
       return '''
           {
               "method": "GET",
               "url": "http://example.com/index.html";,
               "save_to": "example/dag/1",
               "timeout_seconds": 5
           }
           '''
   
   
   def process_reply(message: ServiceBusMessage, context: Context):
       # Process the reply message here
       print(f"Received reply: {message}")
       body = json.loads(str(message))
       context['ti'].xcom_push(key='URL', value=body['saved_to_fqn'])
       context['ti'].xcom_push(key='STATUS_CODE', value=body['status'])
   
   
   def print_url(**context):
       url = context['ti'].xcom_pull(task_ids='send_request', key='URL')
       print('url:', url)
   
   
   def print_status_code(**context):
       status_code = context['ti'].xcom_pull(task_ids='send_request', 
key='STATUS_CODE')
       print("status_code", status_code)
   
   
   hello_operator = PythonOperator(task_id='hello_task', dag=dag, 
python_callable=print_hello)
   
   send_request = AzureServiceBusRequestReplyOperator(
           task_id='send_request',
           dag=dag,
           request_queue_name="file-rtrvr-request",
           request_body_generator=body_generator,
           reply_topic_name="file-rtrvr-complete",
           max_wait_time=360, # 6 minutes, poll for messages is 5 minutes in 
Azure Container App Job
           reply_message_callback=process_reply,
           azure_service_bus_conn_id="azure_service_bus_default",
   )
   
   status_operator = PythonOperator(
       task_id='print_status_task',
       dag=dag,
       python_callable=print_status_code,
       provide_context=True,
   )
   
   url_operator = PythonOperator(
       task_id='done',
       dag=dag,
       python_callable=print_url,
       provide_context=True,
   )
   
   
   hello_operator >> send_request >> url_operator >> status_operator
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to