perry2of5 opened a new pull request, #44675:
URL: https://github.com/apache/airflow/pull/44675
This PR adds a request-reply operator to implement the design pattern from
Enterprise Integration Patterns, Hohpe, Woolf,
Addison-Wesley, 2003
In particular, this means one could:
a) Create a service bus queue and topic for a batch process
b) set up an auto-scaling Azure Container Job listening to an Azure Service
Bus queue for messages
c) create a DAG using the request-reply operator to start the Azure
Container Job and capture the reply when it finishes.
Potential improvements:
* have the operator background itself while waiting for a reply. No need to
tie up a worker thread while a remote process runs
* Provide more parameters to control the subscription to the reply queue.
Right now it deregisters itself if not used for 6 hours and drops messages
after 1 hour. This seems reasonable to me since this subscription should only
exist for the life of the operator, but more configuration might help some use
case I haven't thought of.
A working Azure Container App Job can be built using the
scripts/event-job-aca.zsh in the repo
https://github.com/perry2of5/http-file-rtrvr
A working DAG is provided below:
```
from datetime import datetime
from airflow import DAG
from airflow.utils.context import Context
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.operators.asb import
AzureServiceBusRequestReplyOperator
from azure.servicebus import ServiceBusMessage
import json
dag = DAG('test-http-req-reply-dag', description='Test sending message to
HTTP download service',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20), catchup=False)
def print_hello():
return 'Hello world from first Airflow DAG!'
def body_generator(context: Context):
# Define the request body here
return '''
{
"method": "GET",
"url": "http://example.com/index.html",
"save_to": "example/dag/1",
"timeout_seconds": 5
}
'''
def process_reply(message: ServiceBusMessage, context: Context):
# Process the reply message here
print(f"Received reply: {message}")
body = json.loads(str(message))
context['ti'].xcom_push(key='URL', value=body['saved_to_fqn'])
context['ti'].xcom_push(key='STATUS_CODE', value=body['status'])
def print_url(**context):
url = context['ti'].xcom_pull(task_ids='send_request', key='URL')
print('url:', url)
def print_status_code(**context):
status_code = context['ti'].xcom_pull(task_ids='send_request',
key='STATUS_CODE')
print("status_code", status_code)
hello_operator = PythonOperator(task_id='hello_task', dag=dag,
python_callable=print_hello)
send_request = AzureServiceBusRequestReplyOperator(
task_id='send_request',
dag=dag,
request_queue_name="file-rtrvr-request",
request_body_generator=body_generator,
reply_topic_name="file-rtrvr-complete",
max_wait_time=360, # 6 minutes, poll for messages is 5 minutes in
Azure Container App Job
reply_message_callback=process_reply,
azure_service_bus_conn_id="azure_service_bus_default",
)
status_operator = PythonOperator(
task_id='print_status_task',
dag=dag,
python_callable=print_status_code,
provide_context=True,
)
url_operator = PythonOperator(
task_id='done',
dag=dag,
python_callable=print_url,
provide_context=True,
)
hello_operator >> send_request >> url_operator >> status_operator
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]