This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch 
revert-54494-feat/pubsub-message-queue-provider
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 335d16ebcd14103c199d02734aaf988cdf8ca778
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Mon Oct 6 17:48:16 2025 +0800

    Revert "AIP-82: implement Google Pub/Sub message queue provider (#54494)"
    
    This reverts commit a9d9736863b45f6d37398605e23ea712bef2628a.
---
 dev/breeze/tests/test_selective_checks.py          |  8 +-
 providers/google/docs/index.rst                    |  1 -
 providers/google/docs/message-queues/index.rst     | 70 ----------------
 providers/google/provider.yaml                     |  3 -
 providers/google/pyproject.toml                    |  4 -
 .../providers/google/cloud/queues/__init__.py      | 16 ----
 .../providers/google/cloud/queues/pubsub.py        | 68 ----------------
 .../airflow/providers/google/get_provider_info.py  |  1 -
 .../pubsub/example_pubsub_message_queue_trigger.py | 93 ----------------------
 .../tests/unit/google/cloud/queues/__init__.py     | 16 ----
 .../tests/unit/google/cloud/queues/test_pubsub.py  | 45 -----------
 11 files changed, 4 insertions(+), 321 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index f0253231279..5c00a026921 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1847,7 +1847,7 @@ def test_expected_output_push(
             ),
             {
                 "selected-providers-list-as-string": "amazon apache.beam 
apache.cassandra apache.kafka "
-                "cncf.kubernetes common.compat common.messaging common.sql "
+                "cncf.kubernetes common.compat common.sql "
                 "facebook google hashicorp http microsoft.azure 
microsoft.mssql mysql "
                 "openlineage oracle postgres presto salesforce samba sftp ssh 
trino",
                 "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -1859,7 +1859,7 @@ def test_expected_output_push(
                 "skip-providers-tests": "false",
                 "docs-build": "true",
                 "docs-list-as-string": "apache-airflow helm-chart amazon 
apache.beam apache.cassandra "
-                "apache.kafka cncf.kubernetes common.compat common.messaging 
common.sql facebook google hashicorp http microsoft.azure "
+                "apache.kafka cncf.kubernetes common.compat common.sql 
facebook google hashicorp http microsoft.azure "
                 "microsoft.mssql mysql openlineage oracle postgres "
                 "presto salesforce samba sftp ssh trino",
                 "skip-prek-hooks": ALL_SKIPPED_COMMITS_IF_NO_UI,
@@ -1873,7 +1873,7 @@ def test_expected_output_push(
                         {
                             "description": "amazon...google",
                             "test_types": "Providers[amazon] 
Providers[apache.beam,apache.cassandra,"
-                            
"apache.kafka,cncf.kubernetes,common.compat,common.messaging,common.sql,facebook,"
+                            
"apache.kafka,cncf.kubernetes,common.compat,common.sql,facebook,"
                             
"hashicorp,http,microsoft.azure,microsoft.mssql,mysql,"
                             
"openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] "
                             "Providers[google]",
@@ -2117,7 +2117,7 @@ def test_upgrade_to_newer_dependencies(
             ("providers/google/docs/some_file.rst",),
             {
                 "docs-list-as-string": "amazon apache.beam apache.cassandra 
apache.kafka "
-                "cncf.kubernetes common.compat common.messaging common.sql 
facebook google hashicorp http "
+                "cncf.kubernetes common.compat common.sql facebook google 
hashicorp http "
                 "microsoft.azure microsoft.mssql mysql openlineage oracle "
                 "postgres presto salesforce samba sftp ssh trino",
             },
diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst
index 24db6cefe81..3ca89dba953 100644
--- a/providers/google/docs/index.rst
+++ b/providers/google/docs/index.rst
@@ -36,7 +36,6 @@
 
     Connection types <connections/index>
     Logging handlers <logging/index>
-    Message queues <message-queues/index>
     Secrets backends <secrets-backends/google-cloud-secret-manager-backend>
     API Authentication backend <api-auth-backend/google-openid>
     Operators <operators/index>
diff --git a/providers/google/docs/message-queues/index.rst 
b/providers/google/docs/message-queues/index.rst
deleted file mode 100644
index dcc30c55713..00000000000
--- a/providers/google/docs/message-queues/index.rst
+++ /dev/null
@@ -1,70 +0,0 @@
- .. Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
- ..   http://www.apache.org/licenses/LICENSE-2.0
-
- .. Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
-Google Cloud Messaging Queues
-==============================
-
-.. contents::
-   :local:
-   :depth: 2
-
-Google Cloud Pub/Sub Queue Provider
-------------------------------------
-
-Implemented by 
:class:`~airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider`
-
-The Google Cloud Pub/Sub Queue Provider is a message queue provider that uses 
Google Cloud Pub/Sub as the underlying message queue system.
-
-It allows you to send and receive messages using Cloud Pub/Sub in your Airflow 
workflows
-with 
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
 common message queue interface.
-
-.. include:: /../src/airflow/providers/google/cloud/queues/pubsub.py
-    :start-after: [START pubsub_message_queue_provider_description]
-    :end-before: [END pubsub_message_queue_provider_description]
-
-Pub/Sub Message Queue Trigger
------------------------------
-
-Implemented by 
:class:`~airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger`
-
-Inherited from 
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
-
-
-Wait for a message in a queue
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Below is an example of how you can configure an Airflow DAG to be triggered by 
a message in Pub/Sub.
-
-.. exampleinclude:: 
/../tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
-    :language: python
-    :start-after: [START howto_trigger_pubsub_message_queue]
-    :end-before: [END howto_trigger_pubsub_message_queue]
-
-How it works
-------------
-
-1. **Pub/Sub Message Queue Trigger**: The ``PubsubPullTrigger`` listens for 
messages from a Google Cloud Pub/Sub subscription.
-
-2. **Asset and Watcher**: The ``Asset`` abstracts the external entity, the 
Pub/Sub subscription in this example.
-   The ``AssetWatcher`` associate a trigger with a name. This name helps you 
identify which trigger is associated to which
-   asset.
-
-3. **Event-Driven DAG**: Instead of running on a fixed schedule, the DAG 
executes when the asset receives an update
-   (e.g., a new message in the queue).
-
-For how to use the trigger, refer to the documentation of the
-:ref:`Messaging Trigger <howto/trigger:MessageQueueTrigger>`
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index 6005f302c17..fc04622e4a4 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -1257,6 +1257,3 @@ auth-backends:
 logging:
   - airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler
   - 
airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler
-
-queues:
-  - airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index e5f0e86acef..0bb82701d12 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -203,9 +203,6 @@ dependencies = [
 "http" = [
     "apache-airflow-providers-http"
 ]
-"common.messaging" = [
-    "apache-airflow-providers-common-messaging"
-]
 
 [dependency-groups]
 dev = [
@@ -217,7 +214,6 @@ dev = [
     "apache-airflow-providers-apache-cassandra",
     "apache-airflow-providers-cncf-kubernetes",
     "apache-airflow-providers-common-compat",
-    "apache-airflow-providers-common-messaging",
     "apache-airflow-providers-common-sql",
     "apache-airflow-providers-facebook",
     "apache-airflow-providers-http",
diff --git 
a/providers/google/src/airflow/providers/google/cloud/queues/__init__.py 
b/providers/google/src/airflow/providers/google/cloud/queues/__init__.py
deleted file mode 100644
index 13a83393a91..00000000000
--- a/providers/google/src/airflow/providers/google/cloud/queues/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git 
a/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py 
b/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py
deleted file mode 100644
index 884cc4d21c1..00000000000
--- a/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow.exceptions import AirflowOptionalProviderFeatureException
-from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
-
-try:
-    from airflow.providers.common.messaging.providers.base_provider import 
BaseMessageQueueProvider
-except ImportError:
-    raise AirflowOptionalProviderFeatureException(
-        "This feature requires the 'common.messaging' provider to be installed 
in version >= 1.0.1."
-    )
-
-if TYPE_CHECKING:
-    from airflow.triggers.base import BaseEventTrigger
-
-
-class PubsubMessageQueueProvider(BaseMessageQueueProvider):
-    """
-    Configuration for PubSub integration with common-messaging.
-
-    [START pubsub_message_queue_provider_description]
-    * It uses ``google+pubsub`` as the scheme for identifying the provider.
-    * For parameter definitions, take a look at 
:class:`~airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger`.
-
-    .. code-block:: python
-
-        from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
-        from airflow.sdk import Asset, AssetWatcher
-
-        trigger = MessageQueueTrigger(
-            scheme="google+pubsub",
-            # Additional PubsubPullTrigger parameters as needed
-            project_id="my_project",
-            subscription="my_subscription",
-            ack_messages=True,
-            max_messages=1,
-            gcp_conn_id="google_cloud_default",
-            poke_interval=60.0,
-        )
-
-        asset = Asset("pubsub_queue_asset", 
watchers=[AssetWatcher(name="pubsub_watcher", trigger=trigger)])
-
-    [END pubsub_message_queue_provider_description]
-
-    """
-
-    scheme = "google+pubsub"
-
-    def trigger_class(self) -> type[BaseEventTrigger]:
-        return PubsubPullTrigger  # type: ignore[return-value]
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index 72f747c3627..f79dcd19d70 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -1518,5 +1518,4 @@ def get_provider_info():
             
"airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
             
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
         ],
-        "queues": 
["airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider"],
     }
diff --git 
a/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
 
b/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
deleted file mode 100644
index e4d92db83ad..00000000000
--- 
a/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that demonstrates using Google Cloud Pub/Sub with 
MessageQueueTrigger
-and Asset Watchers for event-driven workflows.
-
-This example shows how to create a DAG that triggers when messages arrive in a
-Google Cloud Pub/Sub subscription using Asset Watchers.
-
-Prerequisites
--------------
-
-Before running this example, ensure you have:
-
-1. A GCP project with Pub/Sub API enabled
-2. The following Pub/Sub resources created in your project:
-
-   - Topic: ``test-topic``
-   - Subscription: ``test-subscription``
-
-You can create these resources using:
-
-.. code-block:: bash
-
-    # Create topic
-    gcloud pubsub topics create test-topic --project={PROJECT_ID}
-
-    # Create subscription
-    gcloud pubsub subscriptions create test-subscription \\
-        --topic=test-topic --project={PROJECT_ID}
-
-How to test
------------
-
-1. Ensure the Pub/Sub resources exist (see Prerequisites above)
-2. Publish a message to trigger the DAG:
-
-   .. code-block:: bash
-
-       gcloud pubsub topics publish test-topic \\
-           --message="Test message" --project={PROJECT_ID}
-
-3. The DAG will be triggered automatically when the message arrives
-"""
-
-from __future__ import annotations
-
-# [START howto_trigger_pubsub_message_queue]
-from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
-from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk import DAG, Asset, AssetWatcher
-
-# Define a trigger that listens to a Google Cloud Pub/Sub subscription
-trigger = MessageQueueTrigger(
-    scheme="google+pubsub",
-    project_id="my-project",
-    subscription="test-subscription",
-    ack_messages=True,
-    max_messages=1,
-    gcp_conn_id="google_cloud_default",
-    poke_interval=60.0,
-)
-
-# Define an asset that watches for messages on the Pub/Sub subscription
-asset = Asset("pubsub_queue_asset_1", 
watchers=[AssetWatcher(name="pubsub_watcher_1", trigger=trigger)])
-
-with DAG(
-    dag_id="example_pubsub_message_queue_trigger",
-    schedule=[asset],
-) as dag:
-    process_message_task = EmptyOperator(task_id="process_pubsub_message")
-# [END howto_trigger_pubsub_message_queue]
-
-
-from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git a/providers/google/tests/unit/google/cloud/queues/__init__.py 
b/providers/google/tests/unit/google/cloud/queues/__init__.py
deleted file mode 100644
index 13a83393a91..00000000000
--- a/providers/google/tests/unit/google/cloud/queues/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/providers/google/tests/unit/google/cloud/queues/test_pubsub.py 
b/providers/google/tests/unit/google/cloud/queues/test_pubsub.py
deleted file mode 100644
index 4b80bae531b..00000000000
--- a/providers/google/tests/unit/google/cloud/queues/test_pubsub.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
-
-pytest.importorskip("airflow.providers.common.messaging.providers.base_provider")
-
-
-def test_message_pubsub_queue_create():
-    from airflow.providers.common.messaging.providers.base_provider import 
BaseMessageQueueProvider
-    from airflow.providers.google.cloud.queues.pubsub import 
PubsubMessageQueueProvider
-
-    provider = PubsubMessageQueueProvider()
-    assert isinstance(provider, BaseMessageQueueProvider)
-
-
-def test_message_pubsub_queue_trigger_class():
-    from airflow.providers.google.cloud.queues.pubsub import 
PubsubMessageQueueProvider
-
-    provider = PubsubMessageQueueProvider()
-    assert provider.trigger_class() == PubsubPullTrigger
-
-
-def test_scheme_matches():
-    from airflow.providers.google.cloud.queues.pubsub import 
PubsubMessageQueueProvider
-
-    provider = PubsubMessageQueueProvider()
-    assert provider.scheme_matches("google+pubsub")

Reply via email to