This is an automated email from the ASF dual-hosted git repository. jasonliu pushed a commit to branch revert-54494-feat/pubsub-message-queue-provider in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 335d16ebcd14103c199d02734aaf988cdf8ca778 Author: LIU ZHE YOU <[email protected]> AuthorDate: Mon Oct 6 17:48:16 2025 +0800 Revert "AIP-82: implement Google Pub/Sub message queue provider (#54494)" This reverts commit a9d9736863b45f6d37398605e23ea712bef2628a. --- dev/breeze/tests/test_selective_checks.py | 8 +- providers/google/docs/index.rst | 1 - providers/google/docs/message-queues/index.rst | 70 ---------------- providers/google/provider.yaml | 3 - providers/google/pyproject.toml | 4 - .../providers/google/cloud/queues/__init__.py | 16 ---- .../providers/google/cloud/queues/pubsub.py | 68 ---------------- .../airflow/providers/google/get_provider_info.py | 1 - .../pubsub/example_pubsub_message_queue_trigger.py | 93 ---------------------- .../tests/unit/google/cloud/queues/__init__.py | 16 ---- .../tests/unit/google/cloud/queues/test_pubsub.py | 45 ----------- 11 files changed, 4 insertions(+), 321 deletions(-) diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index f0253231279..5c00a026921 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -1847,7 +1847,7 @@ def test_expected_output_push( ), { "selected-providers-list-as-string": "amazon apache.beam apache.cassandra apache.kafka " - "cncf.kubernetes common.compat common.messaging common.sql " + "cncf.kubernetes common.compat common.sql " "facebook google hashicorp http microsoft.azure microsoft.mssql mysql " "openlineage oracle postgres presto salesforce samba sftp ssh trino", "all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']", @@ -1859,7 +1859,7 @@ def test_expected_output_push( "skip-providers-tests": "false", "docs-build": "true", "docs-list-as-string": "apache-airflow helm-chart amazon apache.beam apache.cassandra " - "apache.kafka cncf.kubernetes common.compat common.messaging common.sql facebook google hashicorp http microsoft.azure " + "apache.kafka cncf.kubernetes common.compat common.sql facebook google hashicorp http microsoft.azure " "microsoft.mssql mysql openlineage oracle postgres " "presto salesforce samba sftp ssh trino", "skip-prek-hooks": ALL_SKIPPED_COMMITS_IF_NO_UI, @@ -1873,7 +1873,7 @@ def test_expected_output_push( { "description": "amazon...google", "test_types": "Providers[amazon] Providers[apache.beam,apache.cassandra," - "apache.kafka,cncf.kubernetes,common.compat,common.messaging,common.sql,facebook," + "apache.kafka,cncf.kubernetes,common.compat,common.sql,facebook," "hashicorp,http,microsoft.azure,microsoft.mssql,mysql," "openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] " "Providers[google]", @@ -2117,7 +2117,7 @@ def test_upgrade_to_newer_dependencies( ("providers/google/docs/some_file.rst",), { "docs-list-as-string": "amazon apache.beam apache.cassandra apache.kafka " - "cncf.kubernetes common.compat common.messaging common.sql facebook google hashicorp http " + "cncf.kubernetes common.compat common.sql facebook google hashicorp http " "microsoft.azure microsoft.mssql mysql openlineage oracle " "postgres presto salesforce samba sftp ssh trino", }, diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst index 24db6cefe81..3ca89dba953 100644 --- a/providers/google/docs/index.rst +++ b/providers/google/docs/index.rst @@ -36,7 +36,6 @@ Connection types <connections/index> Logging handlers <logging/index> - Message queues <message-queues/index> Secrets backends <secrets-backends/google-cloud-secret-manager-backend> API Authentication backend <api-auth-backend/google-openid> Operators <operators/index> diff --git a/providers/google/docs/message-queues/index.rst b/providers/google/docs/message-queues/index.rst deleted file mode 100644 index dcc30c55713..00000000000 --- a/providers/google/docs/message-queues/index.rst +++ /dev/null @@ -1,70 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - -Google Cloud Messaging Queues -============================== - -.. contents:: - :local: - :depth: 2 - -Google Cloud Pub/Sub Queue Provider ------------------------------------- - -Implemented by :class:`~airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider` - -The Google Cloud Pub/Sub Queue Provider is a message queue provider that uses Google Cloud Pub/Sub as the underlying message queue system. - -It allows you to send and receive messages using Cloud Pub/Sub in your Airflow workflows -with :class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger` common message queue interface. - -.. include:: /../src/airflow/providers/google/cloud/queues/pubsub.py - :start-after: [START pubsub_message_queue_provider_description] - :end-before: [END pubsub_message_queue_provider_description] - -Pub/Sub Message Queue Trigger ------------------------------ - -Implemented by :class:`~airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger` - -Inherited from :class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger` - - -Wait for a message in a queue -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Below is an example of how you can configure an Airflow DAG to be triggered by a message in Pub/Sub. - -.. exampleinclude:: /../tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py - :language: python - :start-after: [START howto_trigger_pubsub_message_queue] - :end-before: [END howto_trigger_pubsub_message_queue] - -How it works ------------- - -1. **Pub/Sub Message Queue Trigger**: The ``PubsubPullTrigger`` listens for messages from a Google Cloud Pub/Sub subscription. - -2. **Asset and Watcher**: The ``Asset`` abstracts the external entity, the Pub/Sub subscription in this example. - The ``AssetWatcher`` associate a trigger with a name. This name helps you identify which trigger is associated to which - asset. - -3. **Event-Driven DAG**: Instead of running on a fixed schedule, the DAG executes when the asset receives an update - (e.g., a new message in the queue). - -For how to use the trigger, refer to the documentation of the -:ref:`Messaging Trigger <howto/trigger:MessageQueueTrigger>` diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml index 6005f302c17..fc04622e4a4 100644 --- a/providers/google/provider.yaml +++ b/providers/google/provider.yaml @@ -1257,6 +1257,3 @@ auth-backends: logging: - airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler - airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler - -queues: - - airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml index e5f0e86acef..0bb82701d12 100644 --- a/providers/google/pyproject.toml +++ b/providers/google/pyproject.toml @@ -203,9 +203,6 @@ dependencies = [ "http" = [ "apache-airflow-providers-http" ] -"common.messaging" = [ - "apache-airflow-providers-common-messaging" -] [dependency-groups] dev = [ @@ -217,7 +214,6 @@ dev = [ "apache-airflow-providers-apache-cassandra", "apache-airflow-providers-cncf-kubernetes", "apache-airflow-providers-common-compat", - "apache-airflow-providers-common-messaging", "apache-airflow-providers-common-sql", "apache-airflow-providers-facebook", "apache-airflow-providers-http", diff --git a/providers/google/src/airflow/providers/google/cloud/queues/__init__.py b/providers/google/src/airflow/providers/google/cloud/queues/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/providers/google/src/airflow/providers/google/cloud/queues/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py b/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py deleted file mode 100644 index 884cc4d21c1..00000000000 --- a/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from typing import TYPE_CHECKING - -from airflow.exceptions import AirflowOptionalProviderFeatureException -from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger - -try: - from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider -except ImportError: - raise AirflowOptionalProviderFeatureException( - "This feature requires the 'common.messaging' provider to be installed in version >= 1.0.1." - ) - -if TYPE_CHECKING: - from airflow.triggers.base import BaseEventTrigger - - -class PubsubMessageQueueProvider(BaseMessageQueueProvider): - """ - Configuration for PubSub integration with common-messaging. - - [START pubsub_message_queue_provider_description] - * It uses ``google+pubsub`` as the scheme for identifying the provider. - * For parameter definitions, take a look at :class:`~airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger`. - - .. code-block:: python - - from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger - from airflow.sdk import Asset, AssetWatcher - - trigger = MessageQueueTrigger( - scheme="google+pubsub", - # Additional PubsubPullTrigger parameters as needed - project_id="my_project", - subscription="my_subscription", - ack_messages=True, - max_messages=1, - gcp_conn_id="google_cloud_default", - poke_interval=60.0, - ) - - asset = Asset("pubsub_queue_asset", watchers=[AssetWatcher(name="pubsub_watcher", trigger=trigger)]) - - [END pubsub_message_queue_provider_description] - - """ - - scheme = "google+pubsub" - - def trigger_class(self) -> type[BaseEventTrigger]: - return PubsubPullTrigger # type: ignore[return-value] diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py b/providers/google/src/airflow/providers/google/get_provider_info.py index 72f747c3627..f79dcd19d70 100644 --- a/providers/google/src/airflow/providers/google/get_provider_info.py +++ b/providers/google/src/airflow/providers/google/get_provider_info.py @@ -1518,5 +1518,4 @@ def get_provider_info(): "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", ], - "queues": ["airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider"], } diff --git a/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py b/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py deleted file mode 100644 index e4d92db83ad..00000000000 --- a/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Example Airflow DAG that demonstrates using Google Cloud Pub/Sub with MessageQueueTrigger -and Asset Watchers for event-driven workflows. - -This example shows how to create a DAG that triggers when messages arrive in a -Google Cloud Pub/Sub subscription using Asset Watchers. - -Prerequisites -------------- - -Before running this example, ensure you have: - -1. A GCP project with Pub/Sub API enabled -2. The following Pub/Sub resources created in your project: - - - Topic: ``test-topic`` - - Subscription: ``test-subscription`` - -You can create these resources using: - -.. code-block:: bash - - # Create topic - gcloud pubsub topics create test-topic --project={PROJECT_ID} - - # Create subscription - gcloud pubsub subscriptions create test-subscription \\ - --topic=test-topic --project={PROJECT_ID} - -How to test ------------ - -1. Ensure the Pub/Sub resources exist (see Prerequisites above) -2. Publish a message to trigger the DAG: - - .. code-block:: bash - - gcloud pubsub topics publish test-topic \\ - --message="Test message" --project={PROJECT_ID} - -3. The DAG will be triggered automatically when the message arrives -""" - -from __future__ import annotations - -# [START howto_trigger_pubsub_message_queue] -from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger -from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.sdk import DAG, Asset, AssetWatcher - -# Define a trigger that listens to a Google Cloud Pub/Sub subscription -trigger = MessageQueueTrigger( - scheme="google+pubsub", - project_id="my-project", - subscription="test-subscription", - ack_messages=True, - max_messages=1, - gcp_conn_id="google_cloud_default", - poke_interval=60.0, -) - -# Define an asset that watches for messages on the Pub/Sub subscription -asset = Asset("pubsub_queue_asset_1", watchers=[AssetWatcher(name="pubsub_watcher_1", trigger=trigger)]) - -with DAG( - dag_id="example_pubsub_message_queue_trigger", - schedule=[asset], -) as dag: - process_message_task = EmptyOperator(task_id="process_pubsub_message") -# [END howto_trigger_pubsub_message_queue] - - -from tests_common.test_utils.system_tests import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag) diff --git a/providers/google/tests/unit/google/cloud/queues/__init__.py b/providers/google/tests/unit/google/cloud/queues/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/providers/google/tests/unit/google/cloud/queues/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/google/tests/unit/google/cloud/queues/test_pubsub.py b/providers/google/tests/unit/google/cloud/queues/test_pubsub.py deleted file mode 100644 index 4b80bae531b..00000000000 --- a/providers/google/tests/unit/google/cloud/queues/test_pubsub.py +++ /dev/null @@ -1,45 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger - -pytest.importorskip("airflow.providers.common.messaging.providers.base_provider") - - -def test_message_pubsub_queue_create(): - from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider - from airflow.providers.google.cloud.queues.pubsub import PubsubMessageQueueProvider - - provider = PubsubMessageQueueProvider() - assert isinstance(provider, BaseMessageQueueProvider) - - -def test_message_pubsub_queue_trigger_class(): - from airflow.providers.google.cloud.queues.pubsub import PubsubMessageQueueProvider - - provider = PubsubMessageQueueProvider() - assert provider.trigger_class() == PubsubPullTrigger - - -def test_scheme_matches(): - from airflow.providers.google.cloud.queues.pubsub import PubsubMessageQueueProvider - - provider = PubsubMessageQueueProvider() - assert provider.scheme_matches("google+pubsub")
