This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c1a648124fd Azure Message Bus - CommonMessageQueue Interface (#52712) 
(#61924)
c1a648124fd is described below

commit c1a648124fd6da6ebbee4612d674cea08f3cedfd
Author: Aaron Chen <[email protected]>
AuthorDate: Sun Feb 15 05:56:50 2026 -0800

    Azure Message Bus - CommonMessageQueue Interface (#52712) (#61924)
    
    * Azure Message Bus - CommonMessageQueue Interface (#52712)
    
    * fix CI static checks error
---
 docs/spelling_wordlist.txt                         |  1 +
 providers/microsoft/azure/docs/index.rst           | 18 ++---
 .../microsoft/azure/docs/message-queues/index.rst  | 69 +++++++++++++++++++
 providers/microsoft/azure/provider.yaml            |  3 +
 providers/microsoft/azure/pyproject.toml           |  4 ++
 .../providers/microsoft/azure/get_provider_info.py |  1 +
 .../providers/microsoft/azure/queues/__init__.py   | 16 +++++
 .../providers/microsoft/azure/queues/asb.py        | 62 +++++++++++++++++
 .../microsoft/azure/example_event_schedule_asb.py  | 74 ++++++++++++++++++++
 .../tests/unit/microsoft/azure/queues/__init__.py  | 16 +++++
 .../tests/unit/microsoft/azure/queues/test_asb.py  | 78 ++++++++++++++++++++++
 11 files changed, 334 insertions(+), 8 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c749e02473c..c25fecbca3d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -94,6 +94,7 @@ arraysize
 artifactId
 Asana
 asana
+asb
 asc
 ascii
 asciiart
diff --git a/providers/microsoft/azure/docs/index.rst 
b/providers/microsoft/azure/docs/index.rst
index c1c3168cc26..6aa0bfe4483 100644
--- a/providers/microsoft/azure/docs/index.rst
+++ b/providers/microsoft/azure/docs/index.rst
@@ -35,6 +35,7 @@
     :caption: Guides
 
     Connection types <connections/index>
+    Message queues <message-queues/index>
     Operators <operators/index>
     Transfers <transfer/index>
     Filesystems <filesystems/index>
@@ -151,14 +152,15 @@ You can install such cross-provider dependencies when 
installing from PyPI. For
     pip install apache-airflow-providers-microsoft-azure[amazon]
 
 
-==================================================================================================================
  =================
-Dependent package                                                              
                                     Extra
-==================================================================================================================
  =================
-`apache-airflow-providers-amazon 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_             
   ``amazon``
-`apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_  
``common.compat``
-`apache-airflow-providers-oracle 
<https://airflow.apache.org/docs/apache-airflow-providers-oracle>`_             
   ``oracle``
-`apache-airflow-providers-sftp 
<https://airflow.apache.org/docs/apache-airflow-providers-sftp>`_               
     ``sftp``
-==================================================================================================================
  =================
+========================================================================================================================
  ====================
+Dependent package                                                              
                                           Extra
+========================================================================================================================
  ====================
+`apache-airflow-providers-amazon 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_             
         ``amazon``
+`apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_      
  ``common.compat``
+`apache-airflow-providers-common-messaging 
<https://airflow.apache.org/docs/apache-airflow-providers-common-messaging>`_  
``common.messaging``
+`apache-airflow-providers-oracle 
<https://airflow.apache.org/docs/apache-airflow-providers-oracle>`_             
         ``oracle``
+`apache-airflow-providers-sftp 
<https://airflow.apache.org/docs/apache-airflow-providers-sftp>`_               
           ``sftp``
+========================================================================================================================
  ====================
 
 Downloading official packages
 -----------------------------
diff --git a/providers/microsoft/azure/docs/message-queues/index.rst 
b/providers/microsoft/azure/docs/message-queues/index.rst
new file mode 100644
index 00000000000..a0a9fba9ac2
--- /dev/null
+++ b/providers/microsoft/azure/docs/message-queues/index.rst
@@ -0,0 +1,69 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Azure Service Bus Message Queue
+================================
+
+.. contents::
+   :local:
+   :depth: 2
+
+Azure Service Bus Queue Provider
+---------------------------------
+
+Implemented by 
:class:`~airflow.providers.microsoft.azure.queues.asb.AzureServiceBusMessageQueueProvider`
+
+The Azure Service Bus Queue Provider is a 
:class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider`
 that uses
+Azure Service Bus as the underlying message queue system.
+It allows you to send and receive messages using Azure Service Bus queues in 
your Airflow workflows with 
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
 common message queue interface.
+
+
+.. include:: /../src/airflow/providers/microsoft/azure/queues/asb.py
+    :start-after: [START azure_servicebus_message_queue_provider_description]
+    :end-before: [END azure_servicebus_message_queue_provider_description]
+
+Azure Service Bus Message Queue Trigger
+-----------------------------------------
+
+Implemented by 
:class:`~airflow.providers.microsoft.azure.triggers.message_bus.AzureServiceBusQueueTrigger`
+
+Inherited from 
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
+
+Wait for a message in a queue
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Below is an example of how you can configure an Airflow DAG to be triggered by 
a message in Azure Service Bus.
+
+.. exampleinclude:: 
/../tests/system/microsoft/azure/example_event_schedule_asb.py
+    :language: python
+    :start-after: [START howto_trigger_asb_message_queue]
+    :end-before: [END howto_trigger_asb_message_queue]
+
+How it works
+------------
+
+1. **Azure Service Bus Message Queue Trigger**: The 
``AzureServiceBusQueueTrigger`` listens for messages from Azure Service Bus 
queue(s).
+
+2. **Asset and Watcher**: The ``Asset`` abstracts the external entity, the 
Azure Service Bus queue in this example.
+   The ``AssetWatcher`` associate a trigger with a name. This name helps you 
identify which trigger is associated to which
+   asset.
+
+3. **Event-Driven DAG**: Instead of running on a fixed schedule, the DAG 
executes when the asset receives an update
+   (e.g., a new message in the queue).
+
+For how to use the trigger, refer to the documentation of the
+:ref:`Messaging Trigger <howto/trigger:MessageQueueTrigger>`
diff --git a/providers/microsoft/azure/provider.yaml 
b/providers/microsoft/azure/provider.yaml
index 2b85758f432..6cb3921ebcd 100644
--- a/providers/microsoft/azure/provider.yaml
+++ b/providers/microsoft/azure/provider.yaml
@@ -304,6 +304,9 @@ triggers:
     python-modules:
       - airflow.providers.microsoft.azure.triggers.message_bus
 
+queues:
+  - 
airflow.providers.microsoft.azure.queues.asb.AzureServiceBusMessageQueueProvider
+
 transfers:
   - source-integration-name: Local
     target-integration-name: Microsoft Azure Data Lake Storage
diff --git a/providers/microsoft/azure/pyproject.toml 
b/providers/microsoft/azure/pyproject.toml
index 50eab1ad831..00ca93d8063 100644
--- a/providers/microsoft/azure/pyproject.toml
+++ b/providers/microsoft/azure/pyproject.toml
@@ -107,6 +107,9 @@ dependencies = [
 "sftp" = [
     "apache-airflow-providers-sftp"
 ]
+"common.messaging" = [
+    "apache-airflow-providers-common-messaging>=2.0.0"
+]
 
 [dependency-groups]
 dev = [
@@ -115,6 +118,7 @@ dev = [
     "apache-airflow-devel-common",
     "apache-airflow-providers-amazon",
     "apache-airflow-providers-common-compat",
+    "apache-airflow-providers-common-messaging",
     "apache-airflow-providers-oracle",
     "apache-airflow-providers-sftp",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
index 1966efa4fcb..356ea5d13ef 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py
@@ -286,6 +286,7 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.microsoft.azure.triggers.message_bus"],
             },
         ],
+        "queues": 
["airflow.providers.microsoft.azure.queues.asb.AzureServiceBusMessageQueueProvider"],
         "transfers": [
             {
                 "source-integration-name": "Local",
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/queues/__init__.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/queues/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/queues/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/queues/asb.py 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/queues/asb.py
new file mode 100644
index 00000000000..867be6693f7
--- /dev/null
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/queues/asb.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.common.messaging.providers.base_provider import 
BaseMessageQueueProvider
+from airflow.providers.microsoft.azure.triggers.message_bus import 
AzureServiceBusQueueTrigger
+
+if TYPE_CHECKING:
+    from airflow.triggers.base import BaseEventTrigger
+
+
+class AzureServiceBusMessageQueueProvider(BaseMessageQueueProvider):
+    """
+    Configuration for Azure Service Bus integration with common-messaging.
+
+    [START azure_servicebus_message_queue_provider_description]
+
+    * It uses ``azure+servicebus`` as the scheme for identifying the provider.
+    * For parameter definitions, take a look at
+      
:class:`~airflow.providers.microsoft.azure.triggers.message_bus.AzureServiceBusQueueTrigger`.
+
+    .. code-block:: python
+
+        from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+        from airflow.sdk import Asset, AssetWatcher
+
+        trigger = MessageQueueTrigger(
+            scheme="azure+servicebus",
+            # AzureServiceBusQueueTrigger parameters
+            queues=["my-queue"],
+            azure_service_bus_conn_id="azure_service_bus_default",
+            poll_interval=60,
+        )
+
+        asset = Asset(
+            "asb_queue_asset",
+            watchers=[AssetWatcher(name="asb_watcher", trigger=trigger)],
+        )
+
+    [END azure_servicebus_message_queue_provider_description]
+    """
+
+    scheme = "azure+servicebus"
+
+    def trigger_class(self) -> type[BaseEventTrigger]:
+        return AzureServiceBusQueueTrigger
diff --git 
a/providers/microsoft/azure/tests/system/microsoft/azure/example_event_schedule_asb.py
 
b/providers/microsoft/azure/tests/system/microsoft/azure/example_event_schedule_asb.py
new file mode 100644
index 00000000000..9d95040ab33
--- /dev/null
+++ 
b/providers/microsoft/azure/tests/system/microsoft/azure/example_event_schedule_asb.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates using Azure Service Bus with 
MessageQueueTrigger
+and Asset Watchers for event-driven workflows.
+
+This example shows how to create a DAG that triggers when messages arrive in an
+Azure Service Bus queue using Asset Watchers.
+
+Prerequisites
+-------------
+
+Before running this example, ensure you have:
+
+1. An Azure Service Bus namespace
+2. A queue created in the namespace (e.g., ``my-queue``)
+3. An Airflow connection configured with ID ``azure_service_bus_default``
+
+How to test
+-----------
+
+1. Ensure the Azure Service Bus queue exists (see Prerequisites above)
+2. Send a message to the queue to trigger the DAG
+3. The DAG will be triggered automatically when the message arrives
+"""
+
+from __future__ import annotations
+
+# [START howto_trigger_asb_message_queue]
+from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG, Asset, AssetWatcher
+
+# Define a trigger that listens to an Azure Service Bus queue
+trigger = MessageQueueTrigger(
+    scheme="azure+servicebus",
+    queues=["my-queue"],
+    azure_service_bus_conn_id="azure_service_bus_default",
+    poll_interval=60,
+)
+
+# Define an asset that watches for messages on the Azure Service Bus queue
+asset = Asset(
+    "event_schedule_asb_asset_1",
+    watchers=[AssetWatcher(name="event_schedule_asb_watcher_1", 
trigger=trigger)],
+)
+
+with DAG(
+    dag_id="example_event_schedule_asb",
+    schedule=[asset],
+) as dag:
+    process_message_task = EmptyOperator(task_id="process_asb_message")
+# [END howto_trigger_asb_message_queue]
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/queues/__init__.py 
b/providers/microsoft/azure/tests/unit/microsoft/azure/queues/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/microsoft/azure/tests/unit/microsoft/azure/queues/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/queues/test_asb.py 
b/providers/microsoft/azure/tests/unit/microsoft/azure/queues/test_asb.py
new file mode 100644
index 00000000000..01e246bb234
--- /dev/null
+++ b/providers/microsoft/azure/tests/unit/microsoft/azure/queues/test_asb.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.microsoft.azure.triggers.message_bus import 
AzureServiceBusQueueTrigger
+
+from tests_common.test_utils.common_msg_queue import mark_common_msg_queue_test
+
+pytest.importorskip("airflow.providers.common.messaging.providers.base_provider")
+
+
+class TestAzureServiceBusMessageQueueProvider:
+    """Tests for AzureServiceBusMessageQueueProvider."""
+
+    def setup_method(self):
+        """Set up the test environment."""
+        from airflow.providers.microsoft.azure.queues.asb import 
AzureServiceBusMessageQueueProvider
+
+        self.provider = AzureServiceBusMessageQueueProvider()
+
+    def test_queue_create(self):
+        """Test the creation of the AzureServiceBusMessageQueueProvider."""
+        from airflow.providers.common.messaging.providers.base_provider import 
BaseMessageQueueProvider
+
+        assert isinstance(self.provider, BaseMessageQueueProvider)
+
+    @pytest.mark.parametrize(
+        ("scheme", "expected_result"),
+        [
+            pytest.param("azure+servicebus", True, 
id="azure_servicebus_scheme"),
+            pytest.param("sqs", False, id="sqs_scheme"),
+            pytest.param("kafka", False, id="kafka_scheme"),
+            pytest.param("redis+pubsub", False, id="redis_pubsub_scheme"),
+            pytest.param("google+pubsub", False, id="google_pubsub_scheme"),
+            pytest.param("azure", False, id="azure_only_scheme"),
+            pytest.param("unknown", False, id="unknown_scheme"),
+        ],
+    )
+    def test_scheme_matches(self, scheme, expected_result):
+        """Test the scheme_matches method with various schemes."""
+        assert self.provider.scheme_matches(scheme) == expected_result
+
+    def test_trigger_class(self):
+        """Test the trigger_class method."""
+        assert self.provider.trigger_class() == AzureServiceBusQueueTrigger
+
+
+@mark_common_msg_queue_test
+class TestMessageQueueTriggerIntegration:
+    """Test integration with MessageQueueTrigger framework."""
+
+    @pytest.mark.usefixtures("cleanup_providers_manager")
+    def test_provider_integrations_with_scheme_param(self):
+        """Test that MessageQueueTrigger correctly resolves to 
AzureServiceBusQueueTrigger."""
+        from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+        from airflow.providers.microsoft.azure.triggers.message_bus import 
AzureServiceBusQueueTrigger
+
+        with 
patch("airflow.providers.microsoft.azure.triggers.message_bus.MessageHook"):
+            trigger = MessageQueueTrigger(scheme="azure+servicebus", 
queues=["test_queue"])
+            assert isinstance(trigger.trigger, AzureServiceBusQueueTrigger)

Reply via email to