This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f606e0f1da Add aliases for rebranded Google services (#66344)
0f606e0f1da is described below

commit 0f606e0f1dae40877e0fde785cd5c3eca003567c
Author: Ulada Zakharava <[email protected]>
AuthorDate: Tue May 12 16:00:43 2026 +0200

    Add aliases for rebranded Google services (#66344)
---
 providers/google/docs/connections/gcp_looker.rst   |  32 ++--
 .../docs/integration-logos/Google-Data-Proc.png    | Bin 24546 -> 0 bytes
 providers/google/docs/operators/cloud/dataplex.rst | 204 ++++++++++++++++-----
 providers/google/docs/operators/cloud/dataproc.rst | 120 +++++++++---
 providers/google/docs/operators/cloud/looker.rst   |  24 ++-
 providers/google/provider.yaml                     |  36 ++--
 .../google/cloud/operators/knowledge_catalog.py    | 112 +++++++++++
 .../providers/google/cloud/operators/looker.py     |   7 +-
 .../google/cloud/operators/managed_spark.py        |  56 ++++++
 .../airflow/providers/google/get_provider_info.py  |  46 ++---
 .../google/cloud/dataplex/example_dataplex.py      |   4 +-
 .../cloud/dataplex/example_dataplex_catalog.py     |   2 +-
 .../google/cloud/dataplex/example_dataplex_dp.py   |   4 +-
 .../google/cloud/dataplex/example_dataplex_dq.py   |   4 +-
 .../cloud/dataproc/example_dataproc_batch.py       |   4 +-
 .../dataproc/example_dataproc_batch_deferrable.py  |   2 +-
 .../dataproc/example_dataproc_batch_persistent.py  |   4 +-
 ...proc_cluster_create_existing_stopped_cluster.py |   6 +-
 .../example_dataproc_cluster_deferrable.py         |   2 +-
 .../dataproc/example_dataproc_cluster_diagnose.py  |   2 +-
 .../dataproc/example_dataproc_cluster_generator.py |   4 +-
 .../example_dataproc_cluster_start_stop.py         |   6 +-
 .../dataproc/example_dataproc_cluster_update.py    |   2 +-
 .../cloud/dataproc/example_dataproc_flink.py       |   2 +-
 .../google/cloud/dataproc/example_dataproc_gke.py  |   4 +-
 .../cloud/dataproc/example_dataproc_hadoop.py      |   2 +-
 .../google/cloud/dataproc/example_dataproc_hive.py |   2 +-
 .../google/cloud/dataproc/example_dataproc_pig.py  |   2 +-
 .../cloud/dataproc/example_dataproc_presto.py      |   2 +-
 .../cloud/dataproc/example_dataproc_pyspark.py     |   2 +-
 .../cloud/dataproc/example_dataproc_spark.py       |   2 +-
 .../cloud/dataproc/example_dataproc_spark_async.py |   2 +-
 .../dataproc/example_dataproc_spark_deferrable.py  |   2 +-
 .../cloud/dataproc/example_dataproc_spark_sql.py   |   2 +-
 .../cloud/dataproc/example_dataproc_sparkr.py      |   2 +-
 .../example_dataproc_start_from_trigger.py         |   2 +-
 .../cloud/dataproc/example_dataproc_trino.py       |   2 +-
 .../cloud/dataproc/example_dataproc_workflow.py    |   4 +-
 .../example_dataproc_workflow_deferrable.py        |   4 +-
 .../system/google/cloud/looker/example_looker.py   |   4 +-
 .../unit/google/cloud/operators/test_dataproc.py   |   1 +
 .../cloud/operators/test_knowledge_catalog.py      | 171 +++++++++++++++++
 .../unit/google/cloud/operators/test_looker.py     |   9 +-
 .../google/cloud/operators/test_managed_spark.py   |  37 ++++
 44 files changed, 760 insertions(+), 183 deletions(-)

diff --git a/providers/google/docs/connections/gcp_looker.rst 
b/providers/google/docs/connections/gcp_looker.rst
index 8f3099c8a23..1b7dc344c1a 100644
--- a/providers/google/docs/connections/gcp_looker.rst
+++ b/providers/google/docs/connections/gcp_looker.rst
@@ -17,42 +17,42 @@
 
 .. _howto/connection:gcp_looker:
 
-Google Cloud Platform Looker Connection
-=======================================
+Google Cloud Platform Data Studio (Looker) Connection
+======================================================
 
-Communication between Airflow and Looker is done via `Looker API 
<https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK 
<https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker 
API credentials.
+Communication between Airflow and Data Studio (Looker) is done via `Looker API 
<https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
+To facilitate the API communication, Data Studio operators use `Looker SDK 
<https://pypi.org/project/looker-sdk/>`_ as an API client.
+Before calling API, Looker SDK needs to authenticate itself using your Data 
Studio API credentials.
 
-* Obtain your Looker API credentials using instructions in the `Looker API 
authentication documentation 
<https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
+* Obtain your Data Studio API credentials using instructions in the `Looker 
API authentication documentation 
<https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
 
-* Obtain your Looker API path and port as described in the `Looker API 
documentation 
<https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
+* Obtain your Data Studio API path and port as described in the `Looker API 
documentation 
<https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
 
-* Setup a Looker connection in Airflow.
+* Setup a Data Studio connection in Airflow.
 
-The ``HTTP`` connection type provides connection to Looker API.
+The ``HTTP`` connection type provides connection to Data Studio API.
 
 The :class:`~airflow.providers.google.cloud.hooks.looker.LookerHook` uses this 
connection to run
-API requests on a Looker instance issued by 
:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`
 and 
:class:`~airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor`.
+API requests on a Data Studio instance issued by 
:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`
 and 
:class:`~airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor`.
 
 
 Configuring the Connection
 --------------------------
 
 Host (required)
-    Base URL for Looker API. Do not include /api/* in the URL.
+    Base URL for Data Studio API. Do not include /api/* in the URL.
 
 Login (required)
-    Looker API client id.
+    Data Studio API client id.
 
 Password (required)
-    Looker API client secret.
+    Data Studio API client secret.
 
 Port (optional)
-    Port for Looker API. If hosted on GCP, don't specify the port leaving just 
the host.
+    Port for Data Studio API. If hosted on GCP, don't specify the port leaving 
just the host.
 
 Extra (optional)
-    Specify the extra parameters (as json dictionary) that can be used in 
Looker
+    Specify the extra parameters (as json dictionary) that can be used in Data 
Studio
     connection. The following parameters are supported:
 
     * ``verify_ssl`` - Set to false only if testing locally against 
self-signed certs. Defaults to true if not specified.
@@ -70,7 +70,7 @@ Extra (optional)
 Connection URI
 --------------
 
-A URL configuration example of a Looker connection:
+A URL configuration example of a Data Studio connection:
 
 .. code-block::
 
diff --git a/providers/google/docs/integration-logos/Google-Data-Proc.png 
b/providers/google/docs/integration-logos/Google-Data-Proc.png
deleted file mode 100644
index 914c384f21e..00000000000
Binary files a/providers/google/docs/integration-logos/Google-Data-Proc.png and 
/dev/null differ
diff --git a/providers/google/docs/operators/cloud/dataplex.rst 
b/providers/google/docs/operators/cloud/dataplex.rst
index 28cfe48ee76..f409770ef89 100644
--- a/providers/google/docs/operators/cloud/dataplex.rst
+++ b/providers/google/docs/operators/cloud/dataplex.rst
@@ -15,19 +15,19 @@
     specific language governing permissions and limitations
     under the License.
 
-Google Dataplex Operators
-=========================
+Google Knowledge Catalog Operators
+==================================
 
-Dataplex is an intelligent data fabric that provides unified analytics
+Knowledge Catalog is an intelligent data fabric that provides unified analytics
 and data management across your data lakes, data warehouses, and data marts.
 
-For more information about the task visit `Dataplex production documentation 
<Product documentation <https://cloud.google.com/dataplex/docs/reference>`__
+For more information about the task visit `Knowledge Catalog production 
documentation <Product documentation 
<https://cloud.google.com/dataplex/docs/reference>`__
 
 Create a Task
 -------------
 
 Before you create a dataplex task you need to define its body.
-For more information about the available fields to pass when creating a task, 
visit `Dataplex create task API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.tasks#Task>`__
+For more information about the available fields to pass when creating a task, 
visit `Knowledge Catalog create task API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.tasks#Task>`__
 
 A simple task configuration can look as followed:
 
@@ -40,6 +40,9 @@ A simple task configuration can look as followed:
 With this configuration we can create the task both synchronously & 
asynchronously:
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateTaskOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogCreateTaskOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
@@ -59,6 +62,9 @@ To delete a task you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteTaskOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogDeleteTaskOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
@@ -72,6 +78,9 @@ To list tasks you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexListTasksOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogListTasksOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
@@ -85,6 +94,9 @@ To get a task you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetTaskOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogGetTaskOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
@@ -109,7 +121,7 @@ Create a Lake
 
 Before you create a dataplex lake you need to define its body.
 
-For more information about the available fields to pass when creating a lake, 
visit `Dataplex create lake API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes#Lake>`__
+For more information about the available fields to pass when creating a lake, 
visit `Knowledge Catalog create lake API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes#Lake>`__
 
 A simple task configuration can look as followed:
 
@@ -123,6 +135,9 @@ With this configuration we can create the lake:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateLakeOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogCreateLakeOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
@@ -136,6 +151,9 @@ To delete a lake you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteLakeOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogDeleteLakeOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
@@ -145,8 +163,8 @@ To delete a lake you can use:
 Create or update a Data Quality scan
 ------------------------------------
 
-Before you create a Dataplex Data Quality scan you need to define its body.
-For more information about the available fields to pass when creating a Data 
Quality scan, visit `Dataplex create data quality API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__
+Before you create a Knowledge Catalog Data Quality scan you need to define its 
body.
+For more information about the available fields to pass when creating a Data 
Quality scan, visit `Knowledge Catalog create data quality API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__
 
 A simple Data Quality scan configuration can look as followed:
 
@@ -160,6 +178,9 @@ With this configuration we can create or update the Data 
Quality scan:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateOrUpdateDataQualityScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateOrUpdateDataQualityScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -173,6 +194,9 @@ To get a Data Quality scan you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogGetDataQualityScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -188,6 +212,9 @@ To delete a Data Quality scan you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteDataQualityScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogDeleteDataQualityScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -197,17 +224,20 @@ To delete a Data Quality scan you can use:
 Run a Data Quality scan
 -----------------------
 
-You can run Dataplex Data Quality scan in asynchronous modes to later check 
its status using sensor:
+You can run Knowledge Catalog Data Quality scan in asynchronous modes to later 
check its status using sensor:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexRunDataQualityScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogRunDataQualityScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_run_data_quality_operator]
     :end-before: [END howto_dataplex_run_data_quality_operator]
 
-To check that running Dataplex Data Quality scan succeeded you can use:
+To check that running Knowledge Catalog Data Quality scan succeeded you can 
use:
 
 
:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexDataQualityJobStatusSensor`.
 
@@ -232,6 +262,9 @@ To get a Data Quality scan job you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanResultOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogGetDataQualityScanResultOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -249,9 +282,9 @@ Also for this action you can use operator in the deferrable 
mode:
 Create a zone
 -------------
 
-Before you create a Dataplex zone you need to define its body.
+Before you create a Knowledge Catalog zone you need to define its body.
 
-For more information about the available fields to pass when creating a zone, 
visit `Dataplex create zone API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones#Zone>`__
+For more information about the available fields to pass when creating a zone, 
visit `Knowledge Catalog create zone API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones#Zone>`__
 
 A simple zone configuration can look as followed:
 
@@ -265,6 +298,9 @@ With this configuration we can create a zone:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateZoneOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogCreateZoneOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -278,6 +314,9 @@ To delete a zone you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteZoneOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogDeleteZoneOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -287,9 +326,9 @@ To delete a zone you can use:
 Create an asset
 ---------------
 
-Before you create a Dataplex asset you need to define its body.
+Before you create a Knowledge Catalog asset you need to define its body.
 
-For more information about the available fields to pass when creating an 
asset, visit `Dataplex create asset API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones.assets#Asset>`__
+For more information about the available fields to pass when creating an 
asset, visit `Knowledge Catalog create asset API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones.assets#Asset>`__
 
 A simple asset configuration can look as followed:
 
@@ -303,6 +342,9 @@ With this configuration we can create the asset:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateAssetOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogCreateAssetOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -316,6 +358,9 @@ To delete an asset you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteAssetOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogDeleteAssetOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
     :language: python
     :dedent: 4
@@ -325,8 +370,8 @@ To delete an asset you can use:
 Create or update a Data Profile scan
 ------------------------------------
 
-Before you create a Dataplex Data Profile scan you need to define its body.
-For more information about the available fields to pass when creating a Data 
Profile scan, visit `Dataplex create data profile API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__
+Before you create a Knowledge Catalog Data Profile scan you need to define its 
body.
+For more information about the available fields to pass when creating a Data 
Profile scan, visit `Knowledge Catalog create data profile API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__
 
 A simple Data Profile scan configuration can look as followed:
 
@@ -340,6 +385,9 @@ With this configuration we can create or update the Data 
Profile scan:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateOrUpdateDataProfileScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateOrUpdateDataProfileScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
     :language: python
     :dedent: 4
@@ -353,6 +401,9 @@ To get a Data Profile scan you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataProfileScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogGetDataProfileScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
     :language: python
     :dedent: 4
@@ -368,6 +419,9 @@ To delete a Data Profile scan you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteDataProfileScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogDeleteDataProfileScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
     :language: python
     :dedent: 4
@@ -377,17 +431,20 @@ To delete a Data Profile scan you can use:
 Run a Data Profile scan
 -----------------------
 
-You can run Dataplex Data Profile scan in asynchronous modes to later check 
its status using sensor:
+You can run Knowledge Catalog Data Profile scan in asynchronous modes to later 
check its status using sensor:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexRunDataProfileScanOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogRunDataProfileScanOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_run_data_profile_operator]
     :end-before: [END howto_dataplex_run_data_profile_operator]
 
-To check that running Dataplex Data Profile scan succeeded you can use:
+To check that running Knowledge Catalog Data Profile scan succeeded you can 
use:
 
 
:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexDataProfileJobStatusSensor`.
 
@@ -412,6 +469,9 @@ To get a Data Profile scan job you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataProfileScanResultOperator`
 
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogGetDataProfileScanResultOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
     :language: python
     :dedent: 4
@@ -419,22 +479,24 @@ To get a Data Profile scan job you can use:
     :end-before: [END howto_dataplex_get_data_profile_job_operator]
 
 
-Google Dataplex Catalog Operators
-=================================
+Google Knowledge Catalog Entry Operators
+========================================
 
-Dataplex Catalog provides a unified inventory of Google Cloud resources, such 
as BigQuery, and other resources,
-such as on-premises resources. Dataplex Catalog automatically retrieves 
metadata for Google Cloud resources,
-and you bring metadata for third-party resources into Dataplex Catalog.
+Knowledge Catalog provides a unified inventory of Google Cloud resources, such 
as BigQuery, and other resources,
+such as on-premises resources. Knowledge Catalog automatically retrieves 
metadata for Google Cloud resources,
+and you bring metadata for third-party resources into Knowledge Catalog.
 
-For more information about Dataplex Catalog visit `Dataplex Catalog production 
documentation <Product documentation 
<https://cloud.google.com/dataplex/docs/catalog-overview>`__
+For more information about Knowledge Catalog visit `Knowledge Catalog 
production documentation <Product documentation 
<https://cloud.google.com/dataplex/docs/catalog-overview>`__
 
 .. _howto/operator:DataplexCatalogCreateEntryGroupOperator:
 
 Create an EntryGroup
 --------------------
 
-To create an Entry Group in specific location in Dataplex Catalog you can
+To create an Entry Group in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateEntryGroupOperator``.
 For more information about the available fields to pass when creating an Entry 
Group, visit `Entry Group resource configuration. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup>`__
 
 A simple Entry Group configuration can look as followed:
@@ -448,6 +510,8 @@ A simple Entry Group configuration can look as followed:
 With this configuration you can create an Entry Group resource:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateEntryGroupOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -460,8 +524,10 @@ With this configuration you can create an Entry Group 
resource:
 Delete an EntryGroup
 --------------------
 
-To delete an Entry Group in specific location in Dataplex Catalog you can
+To delete an Entry Group in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryGroupOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogDeleteEntryGroupOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -474,9 +540,11 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDe
 List EntryGroups
 ----------------
 
-To list all Entry Groups in specific location in Dataplex Catalog you can
+To list all Entry Groups in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryGroupsOperator`.
 This operator also supports filtering and ordering the result of the operation.
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogListEntryGroupsOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -489,8 +557,10 @@ This operator also supports filtering and ordering the 
result of the operation.
 Get an EntryGroup
 -----------------
 
-To retrieve an Entry Group in specific location in Dataplex Catalog you can
+To retrieve an Entry Group in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryGroupOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogGetEntryGroupOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -503,8 +573,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGe
 Update an EntryGroup
 --------------------
 
-To update an Entry Group in specific location in Dataplex Catalog you can
+To update an Entry Group in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryGroupOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogUpdateEntryGroupOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -517,8 +589,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
 Create an EntryType
 --------------------
 
-To create an Entry Type in specific location in Dataplex Catalog you can
+To create an Entry Type in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateEntryTypeOperator``.
 For more information about the available fields to pass when creating an Entry 
Type, visit `Entry Type resource configuration. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryTypes#EntryType>`__
 
 A simple Entry Group configuration can look as followed:
@@ -532,6 +606,8 @@ A simple Entry Group configuration can look as followed:
 With this configuration you can create an Entry Type resource:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateEntryTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -544,8 +620,10 @@ With this configuration you can create an Entry Type 
resource:
 Delete an EntryType
 --------------------
 
-To delete an Entry Type in specific location in Dataplex Catalog you can
+To delete an Entry Type in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogDeleteEntryTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -558,9 +636,11 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDe
 List EntryTypes
 ----------------
 
-To list all Entry Types in specific location in Dataplex Catalog you can
+To list all Entry Types in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryTypesOperator`.
 This operator also supports filtering and ordering the result of the operation.
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogListEntryTypesOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -573,8 +653,10 @@ This operator also supports filtering and ordering the 
result of the operation.
 Get an EntryType
 -----------------
 
-To retrieve an Entry Group in specific location in Dataplex Catalog you can
+To retrieve an Entry Group in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogGetEntryTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -587,8 +669,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGe
 Update an EntryType
 --------------------
 
-To update an Entry Type in specific location in Dataplex Catalog you can
+To update an Entry Type in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogUpdateEntryTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -601,8 +685,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
 Create an AspectType
 --------------------
 
-To create an Aspect Type in specific location in Dataplex Catalog you can
+To create an Aspect Type in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateAspectTypeOperator``.
 For more information about the available fields to pass when creating an 
Aspect Type, visit `Aspect Type resource configuration. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType>`__
 
 A simple Aspect Group configuration can look as followed:
@@ -616,6 +702,8 @@ A simple Aspect Group configuration can look as followed:
 With this configuration you can create an Aspect Type resource:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogCreateAspectTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -628,8 +716,10 @@ With this configuration you can create an Aspect Type 
resource:
 Delete an AspectType
 --------------------
 
-To delete an Aspect Type in specific location in Dataplex Catalog you can
+To delete an Aspect Type in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteAspectTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogDeleteAspectTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -642,9 +732,11 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDe
 List AspectTypes
 ----------------
 
-To list all Aspect Types in specific location in Dataplex Catalog you can
+To list all Aspect Types in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListAspectTypesOperator`.
 This operator also supports filtering and ordering the result of the operation.
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogListAspectTypesOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -657,8 +749,10 @@ This operator also supports filtering and ordering the 
result of the operation.
 Get an AspectType
 -----------------
 
-To retrieve an Aspect Group in specific location in Dataplex Catalog you can
+To retrieve an Aspect Group in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetAspectTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogGetAspectTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -671,8 +765,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGe
 Update an AspectType
 --------------------
 
-To update an Aspect Type in specific location in Dataplex Catalog you can
+To update an Aspect Type in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateAspectTypeOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is 
``KnowledgeCatalogUpdateAspectTypeOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -685,8 +781,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
 Create an Entry
 ---------------
 
-To create an Entry in specific location in Dataplex Catalog you can
+To create an Entry in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogCreateEntryOperator``.
 For more information about the available fields to pass when creating an 
Entry, visit `Entry resource configuration. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups.entries>`__
 
 A simple Entry configuration can look as followed:
@@ -700,6 +798,8 @@ A simple Entry configuration can look as followed:
 With this configuration you can create an Entry resource:
 
 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogCreateEntryOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -712,8 +812,10 @@ With this configuration you can create an Entry resource:
 Delete an Entry
 ---------------
 
-To delete an Entry in specific location in Dataplex Catalog you can
+To delete an Entry in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogDeleteEntryOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -726,9 +828,11 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDe
 List Entries
 ------------
 
-To list all Entries in specific location in Dataplex Catalog you can
+To list all Entries in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntriesOperator`.
 This operator also supports filtering and ordering the result of the operation.
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogListEntriesOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -741,8 +845,10 @@ This operator also supports filtering and ordering the 
result of the operation.
 Get an Entry
 ------------
 
-To retrieve an Entry in specific location in Dataplex Catalog you can
+To retrieve an Entry in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogGetEntryOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -755,8 +861,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGe
 Update an Entry
 ---------------
 
-To update an Entry in specific location in Dataplex Catalog you can
+To update an Entry in a specific Knowledge Catalog location you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogUpdateEntryOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -769,8 +877,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
 Look up a single Entry
 ----------------------
 
-To look up a single Entry by name using the permission on the source system in 
Dataplex Catalog you can
+To look up a single Entry by name using the permission on the source system in 
Knowledge Catalog you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogLookupEntryOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogLookupEntryOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
@@ -783,8 +893,10 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogLo
 Search Entries
 --------------
 
-To search for Entries matching the given query and scope in Dataplex Catalog 
you can
+To search for Entries matching the given query and scope in Knowledge Catalog 
you can
 use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogSearchEntriesOperator`
+The executable example below still imports the compatibility name shown above.
+The preferred alias for new code is ``KnowledgeCatalogSearchEntriesOperator``.
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
     :language: python
diff --git a/providers/google/docs/operators/cloud/dataproc.rst 
b/providers/google/docs/operators/cloud/dataproc.rst
index 039a0e8e4af..115b1001eb9 100644
--- a/providers/google/docs/operators/cloud/dataproc.rst
+++ b/providers/google/docs/operators/cloud/dataproc.rst
@@ -15,15 +15,15 @@
     specific language governing permissions and limitations
     under the License.
 
-Google Cloud Dataproc Operators
-===============================
+Google Managed Service for Apache Spark Operators
+=================================================
 
-Dataproc is a managed Apache Spark and Apache Hadoop service that lets you
+Managed Service for Apache Spark is a managed Apache Spark and Apache Hadoop 
service that lets you
 take advantage of open source data tools for batch processing, querying, 
streaming and machine learning.
-Dataproc automation helps you create clusters quickly, manage them easily, and
+Managed Spark automation helps you create clusters quickly, manage them 
easily, and
 save money by turning clusters off when you don't need them.
 
-For more information about the service visit `Dataproc production 
documentation <Product documentation 
<https://cloud.google.com/dataproc/docs/reference>`__
+For more information about the service visit `Managed Spark production 
documentation <Product documentation 
<https://cloud.google.com/dataproc/docs/reference>`__
 
 Prerequisite Tasks
 ------------------
@@ -83,22 +83,22 @@ in that environment before relying on the corresponding 
Airflow operator in a pr
 Create a Cluster
 ----------------
 
-When you create a Dataproc cluster, you have the option to choose Compute 
Engine as the deployment platform.
-In this configuration, Dataproc automatically provisions the required Compute 
Engine VM instances to run the cluster.
+When you create a Managed Spark cluster, you have the option to choose Compute 
Engine as the deployment platform.
+In this configuration, Managed Spark automatically provisions the required 
Compute Engine VM instances to run the cluster.
 The VM instances are used for the main node, primary worker and secondary 
worker nodes (if specified).
-These VM instances are created and managed by Compute Engine, while Dataproc 
takes care of configuring the software and
+These VM instances are created and managed by Compute Engine, while Managed 
Spark takes care of configuring the software and
 orchestration required for the big data processing tasks.
 By providing the configuration for your nodes, you describe the configuration 
of primary and
 secondary nodes, and status of a cluster of Compute Engine instances.
 Configuring secondary worker nodes, you can specify the number of workers and 
their types. By
 enabling the Preemptible option to use Preemptible VMs (equivalent to Spot 
instances) for those nodes, you
-can take advantage of the cost savings provided by these instances for your 
Dataproc workloads.
+can take advantage of the cost savings provided by these instances for your 
Managed Spark workloads.
 The primary node, which typically hosts the cluster main and various control 
services, does not have the Preemptible
 option because it's crucial for the primary node to maintain stability and 
availability.
 Once a cluster is created, the configuration settings, including the 
preemptibility of secondary worker nodes,
 cannot be modified directly.
 
-For more information about the available fields to pass when creating a 
cluster, visit `Dataproc create cluster API. 
<https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
+For more information about the available fields to pass when creating a 
cluster, visit `Managed Spark create cluster API. 
<https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
 
 A cluster configuration can look as followed:
 
@@ -111,24 +111,28 @@ A cluster configuration can look as followed:
 With this configuration we can create the cluster:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
 
+The executable example below still imports the compatibility name
+``DataprocCreateClusterOperator``. The preferred alias for new code is
+``ManagedSparkCreateClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_hive.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_dataproc_create_cluster_operator]
     :end-before: [END how_to_cloud_dataproc_create_cluster_operator]
 
-Dataproc on GKE deploys Dataproc virtual clusters on a GKE cluster. Unlike 
Dataproc on Compute Engine clusters,
-Dataproc on GKE virtual clusters do not include separate main and worker VMs. 
Instead, when you create a Dataproc on
-GKE virtual cluster, Dataproc on GKE creates node pools within a GKE cluster. 
Dataproc on GKE jobs are run as pods on
+Managed Spark on GKE deploys Managed Spark virtual clusters on a GKE cluster. 
Unlike Managed Spark on Compute Engine clusters,
+Managed Spark on GKE virtual clusters do not include separate main and worker 
VMs. Instead, when you create a Managed Spark on
+GKE virtual cluster, Managed Spark on GKE creates node pools within a GKE 
cluster. Managed Spark on GKE jobs are run as pods on
 these node pools. The node pools and scheduling of pods on the node pools are 
managed by GKE.
 
-When creating a GKE Dataproc cluster, you can specify the usage of Preemptible 
VMs for the underlying compute resources.
+When creating a GKE Managed Spark cluster, you can specify the usage of 
Preemptible VMs for the underlying compute resources.
 GKE supports the use of Preemptible VMs as a cost-saving measure.
 By enabling Preemptible VMs, GKE will provision the cluster nodes using 
Preemptible VMs. Or you can create nodes as
 Spot VM instances, which are the latest update to legacy preemptible VMs.
-This can be beneficial for running Dataproc workloads on GKE while optimizing 
costs.
+This can be beneficial for running Managed Spark workloads on GKE while 
optimizing costs.
 
-To create Dataproc cluster in Google Kubernetes Engine you could pass cluster 
configuration:
+To create Managed Spark cluster in Google Kubernetes Engine you could pass 
cluster configuration:
 
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_gke.py
     :language: python
@@ -139,13 +143,17 @@ To create Dataproc cluster in Google Kubernetes Engine 
you could pass cluster co
 With this configuration we can create the cluster:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
 
+The executable example below still imports the compatibility name
+``DataprocCreateClusterOperator``. The preferred alias for new code is
+``ManagedSparkCreateClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_gke.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_dataproc_create_cluster_operator_in_gke]
     :end-before: [END how_to_cloud_dataproc_create_cluster_operator_in_gke]
 
-You can also create Dataproc cluster with optional component Presto.
+You can also create Managed Spark cluster with optional component Presto.
 To do so, please use the following configuration.
 Note that default image might not support the chosen optional component.
 If this is your case, please specify correct ``image_version`` that you can 
find in the
@@ -157,7 +165,7 @@ If this is your case, please specify correct 
``image_version`` that you can find
     :start-after: [START how_to_cloud_dataproc_create_cluster]
     :end-before: [END how_to_cloud_dataproc_create_cluster]
 
-You can also create Dataproc cluster with optional component Trino.
+You can also create Managed Spark cluster with optional component Trino.
 To do so, please use the following configuration.
 Note that default image might not support the chosen optional component.
 If this is your case, please specify correct ``image_version`` that you can 
find in the
@@ -193,15 +201,19 @@ You can generate and use config as followed:
 
 Diagnose a cluster
 ------------------
-Dataproc supports the collection of `cluster diagnostic information 
<https://cloud.google.com/dataproc/docs/support/diagnose-cluster-command#diagnostic_summary_and_archive_contents>`_
-like system, Spark, Hadoop, and Dataproc logs, cluster configuration files 
that can be used to troubleshoot a Dataproc cluster or job.
+Managed Spark supports the collection of `cluster diagnostic information 
<https://cloud.google.com/dataproc/docs/support/diagnose-cluster-command#diagnostic_summary_and_archive_contents>`_
+like system, Spark, Hadoop, and Managed Spark logs, cluster configuration 
files that can be used to troubleshoot a Managed Spark cluster or job.
 It is important to note that this information can only be collected before the 
cluster is deleted.
 For more information about the available fields to pass when diagnosing a 
cluster, visit
-`Dataproc diagnose cluster API. 
<https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/diagnose>`_
+`Managed Spark diagnose cluster API. 
<https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/diagnose>`_
 
-To diagnose a Dataproc cluster use:
+To diagnose a Managed Spark cluster use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDiagnoseClusterOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocDiagnoseClusterOperator``. The preferred alias for new code is
+``ManagedSparkDiagnoseClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
     :language: python
     :dedent: 0
@@ -220,7 +232,7 @@ Update a cluster
 ----------------
 You can scale the cluster up or down by providing a cluster config and a 
updateMask.
 In the updateMask argument you specifies the path, relative to Cluster, of the 
field to update.
-For more information on updateMask and other parameters take a look at 
`Dataproc update cluster API. 
<https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/patch>`__
+For more information on updateMask and other parameters take a look at 
`Managed Spark update cluster API. 
<https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/patch>`__
 
 An example of a new cluster config and the updateMask:
 
@@ -233,6 +245,10 @@ An example of a new cluster config and the updateMask:
 To update a cluster you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator`
 
+The executable example below still imports the compatibility name
+``DataprocUpdateClusterOperator``. The preferred alias for new code is
+``ManagedSparkUpdateClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
     :language: python
     :dedent: 4
@@ -253,6 +269,10 @@ Starting a cluster
 To start a cluster you can use the
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocStartClusterOperator`:
 
+The executable example below still imports the compatibility name
+``DataprocStartClusterOperator``. The preferred alias for new code is
+``ManagedSparkStartClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
     :language: python
     :dedent: 4
@@ -265,6 +285,10 @@ Stopping a cluster
 To stop a cluster you can use the
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocStopClusterOperator`:
 
+The executable example below still imports the compatibility name
+``DataprocStopClusterOperator``. The preferred alias for new code is
+``ManagedSparkStopClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
     :language: python
     :dedent: 4
@@ -277,6 +301,10 @@ Deleting a cluster
 To delete a cluster you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocDeleteClusterOperator``. The preferred alias for new code is
+``ManagedSparkDeleteClusterOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_hive.py
     :language: python
     :dedent: 4
@@ -294,9 +322,9 @@ You can use deferrable mode for this action in order to run 
the operator asynchr
 Submit a job to a cluster
 -------------------------
 
-Dataproc supports submitting jobs of different big data components.
+Managed Spark supports submitting jobs of different big data components.
 The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive.
-For more information on versions and images take a look at `Cloud Dataproc 
Image version list 
<https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
+For more information on versions and images take a look at `Cloud Managed 
Spark Image version list 
<https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
 
 To submit a job to the cluster you need to provide a job source file. The job 
source file can be on GCS, the cluster or on your local
 file system. You can specify a file:/// path to refer to a local file on a 
cluster's primary node.
@@ -304,6 +332,10 @@ file system. You can specify a file:/// path to refer to a 
local file on a clust
 The job configuration can be submitted by using:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocSubmitJobOperator``. The preferred alias for new code is
+``ManagedSparkSubmitJobOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
     :language: python
     :dedent: 4
@@ -408,11 +440,15 @@ Example of the configuration for a Flink Job:
 Working with workflows templates
 --------------------------------
 
-Dataproc supports creating workflow templates that can be triggered later on.
+Managed Spark supports creating workflow templates that can be triggered later 
on.
 
 A workflow template can be created using:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateWorkflowTemplateOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocCreateWorkflowTemplateOperator``. The preferred alias for new code is
+``ManagedSparkCreateWorkflowTemplateOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
     :language: python
     :dedent: 4
@@ -422,6 +458,10 @@ A workflow template can be created using:
 Once a workflow is created users can trigger it using
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator`:
 
+The executable example below still imports the compatibility name
+``DataprocInstantiateWorkflowTemplateOperator``. The preferred alias for new 
code is
+``ManagedSparkInstantiateWorkflowTemplateOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
     :language: python
     :dedent: 4
@@ -439,6 +479,10 @@ Also for all this action you can use operator in the 
deferrable mode:
 The inline operator is an alternative. It creates a workflow, run it, and 
delete it afterwards:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`:
 
+The executable example below still imports the compatibility name
+``DataprocInstantiateInlineWorkflowTemplateOperator``. The preferred alias for 
new code is
+``ManagedSparkInstantiateInlineWorkflowTemplateOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
     :language: python
     :dedent: 4
@@ -457,18 +501,22 @@ Also for all this action you can use operator in the 
deferrable mode:
 Create a Batch
 --------------
 
-Dataproc supports creating a batch workload.
+Managed Spark supports creating a batch workload.
 
 A batch can be created using:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocCreateBatchOperator``. The preferred alias for new code is
+``ManagedSparkCreateBatchOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_dataproc_create_batch_operator]
     :end-before: [END how_to_cloud_dataproc_create_batch_operator]
 
-For creating a batch with Persistent History Server first you should create a 
Dataproc Cluster
+For creating a batch with Persistent History Server first you should create a 
Managed Spark Cluster
 with specific parameters. Documentation how create cluster you can find
 `here 
<https://cloud.google.com/dataproc/docs/concepts/jobs/history-server#setting_up_a_persistent_history_server>`__:
 
@@ -509,6 +557,10 @@ Get a Batch
 To get a batch you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocGetBatchOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocGetBatchOperator``. The preferred alias for new code is
+``ManagedSparkGetBatchOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
     :language: python
     :dedent: 4
@@ -521,6 +573,10 @@ List a Batch
 To get a list of exists batches you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocListBatchesOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocListBatchesOperator``. The preferred alias for new code is
+``ManagedSparkListBatchesOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
     :language: python
     :dedent: 4
@@ -533,6 +589,10 @@ Delete a Batch
 To delete a batch you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteBatchOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocDeleteBatchOperator``. The preferred alias for new code is
+``ManagedSparkDeleteBatchOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
     :language: python
     :dedent: 4
@@ -545,6 +605,10 @@ Cancel a Batch Operation
 To cancel a operation you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCancelOperationOperator`.
 
+The executable example below still imports the compatibility name
+``DataprocCancelOperationOperator``. The preferred alias for new code is
+``ManagedSparkCancelOperationOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
     :language: python
     :dedent: 4
diff --git a/providers/google/docs/operators/cloud/looker.rst 
b/providers/google/docs/operators/cloud/looker.rst
index d9751e0388f..085b5f9bb23 100644
--- a/providers/google/docs/operators/cloud/looker.rst
+++ b/providers/google/docs/operators/cloud/looker.rst
@@ -15,14 +15,14 @@
     specific language governing permissions and limitations
     under the License.
 
-Google Cloud Looker Operators
-===============================
+Google Data Studio (Looker) Operators
+======================================
 
-Looker is a business intelligence software and big data analytics platform that
+Data Studio (Looker) is a business intelligence software and big data 
analytics platform that
 helps you explore, analyze and share real-time business analytics easily.
 
-Looker has a Public API and associated SDK clients in different languages,
-which allow programmatic access to the Looker data platform.
+Data Studio (Looker) has a Public API and associated SDK clients in different 
languages,
+which allow programmatic access to the Data Studio platform.
 
 For more information visit `Looker API documentation 
<https://docs.looker.com/reference/api-and-integration>`_.
 
@@ -39,16 +39,20 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation 
<apache-airflow:installation/index>`.
 
-* Setup a Looker connection in Airflow. You can check 
:doc:`apache-airflow:howto/connection` and :doc:`/connections/gcp_looker`
+* Setup a Data Studio connection in Airflow. You can check 
:doc:`apache-airflow:howto/connection` and :doc:`/connections/gcp_looker`
 
 Start a PDT materialization job
 -------------------------------
 
-To submit a PDT materialization job to Looker you need to provide a model and 
view name.
+To submit a PDT materialization job to Data Studio you need to provide a model 
and view name.
 
 The job configuration can be submitted in synchronous (blocking) mode by using:
 
:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`.
 
+The executable example below still imports the compatibility name
+``LookerStartPdtBuildOperator``. The preferred alias for new code is
+``DataStudioStartPdtBuildOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/looker/example_looker.py
     :language: python
     :dedent: 4
@@ -60,6 +64,10 @@ Alternatively, the job configuration can be submitted in 
asynchronous mode by us
 
:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`
 and
 
:class:`~airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor`.
 
+The executable example below still imports the compatibility name
+``LookerStartPdtBuildOperator``. The preferred alias for new code is
+``DataStudioStartPdtBuildOperator``.
+
 .. exampleinclude:: 
/../../google/tests/system/google/cloud/looker/example_looker.py
     :language: python
     :dedent: 4
@@ -67,4 +75,4 @@ Alternatively, the job configuration can be submitted in 
asynchronous mode by us
     :end-before: [END cloud_looker_async_start_pdt_sensor]
 
 There are more arguments to provide in the jobs than the examples show.
-For the complete list of arguments take a look at Looker operator arguments at 
:class:`airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`
+For the complete list of arguments take a look at Data Studio operator 
arguments at 
:class:`airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index 4eed2dd95e2..1b9a1ace51d 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -311,10 +311,6 @@ integrations:
       - /docs/apache-airflow-providers-google/operators/cloud/compute_ssh.rst
     logo: /docs/integration-logos/Compute-Engine.png
     tags: [gcp]
-  - integration-name: Google Data Proc
-    external-doc-url: https://cloud.google.com/dataproc/
-    logo: /docs/integration-logos/Google-Data-Proc.png
-    tags: [gcp]
   - integration-name: Google Dataflow
     external-doc-url: https://cloud.google.com/dataflow/
     how-to-guide:
@@ -327,7 +323,7 @@ integrations:
       - /docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
     logo: /docs/integration-logos/Google-Data-Fusion.png
     tags: [gcp]
-  - integration-name: Google Dataplex
+  - integration-name: Google Knowledge Catalog
     external-doc-url: https://cloud.google.com/dataplex/
     how-to-guide:
       - /docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -343,7 +339,7 @@ integrations:
     how-to-guide:
       - 
/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
     tags: [gcp]
-  - integration-name: Google Dataproc
+  - integration-name: Google Managed Service for Apache Spark
     external-doc-url: https://cloud.google.com/dataproc/
     how-to-guide:
       - /docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
@@ -440,7 +436,7 @@ integrations:
     how-to-guide:
       - /docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
     tags: [gcp]
-  - integration-name: Google Looker
+  - integration-name: Google Data Studio
     external-doc-url: https://cloud.google.com/looker
     logo: /docs/integration-logos/Cloud-Looker.png
     how-to-guide:
@@ -516,18 +512,20 @@ operators:
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.operators.datafusion
-  - integration-name: Google Dataplex
+  - integration-name: Google Knowledge Catalog
     python-modules:
       - airflow.providers.google.cloud.operators.dataplex
+      - airflow.providers.google.cloud.operators.knowledge_catalog
   - integration-name: Google Dataprep
     python-modules:
       - airflow.providers.google.cloud.operators.dataprep
   - integration-name: Google Dataproc Metastore
     python-modules:
       - airflow.providers.google.cloud.operators.dataproc_metastore
-  - integration-name: Google Dataproc
+  - integration-name: Google Managed Service for Apache Spark
     python-modules:
       - airflow.providers.google.cloud.operators.dataproc
+      - airflow.providers.google.cloud.operators.managed_spark
   - integration-name: Google Datastore
     python-modules:
       - airflow.providers.google.cloud.operators.datastore
@@ -613,7 +611,7 @@ operators:
       - airflow.providers.google.cloud.operators.vertex_ai.generative_model
       - airflow.providers.google.cloud.operators.vertex_ai.feature_store
       - airflow.providers.google.cloud.operators.vertex_ai.ray
-  - integration-name: Google Looker
+  - integration-name: Google Data Studio
     python-modules:
       - airflow.providers.google.cloud.operators.looker
   - integration-name: Google Cloud Dataform
@@ -663,10 +661,10 @@ sensors:
   - integration-name: Google Dataprep
     python-modules:
       - airflow.providers.google.cloud.sensors.dataprep
-  - integration-name: Google Dataplex
+  - integration-name: Google Knowledge Catalog
     python-modules:
       - airflow.providers.google.cloud.sensors.dataplex
-  - integration-name: Google Dataproc
+  - integration-name: Google Managed Service for Apache Spark
     python-modules:
       - airflow.providers.google.cloud.sensors.dataproc
   - integration-name: Google Dataproc Metastore
@@ -693,7 +691,7 @@ sensors:
   - integration-name: Google Display&Video 360
     python-modules:
       - airflow.providers.google.marketing_platform.sensors.display_video
-  - integration-name: Google Looker
+  - integration-name: Google Data Studio
     python-modules:
       - airflow.providers.google.cloud.sensors.looker
   - integration-name: Google Cloud Dataform
@@ -779,7 +777,7 @@ hooks:
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.hooks.datafusion
-  - integration-name: Google Dataplex
+  - integration-name: Google Knowledge Catalog
     python-modules:
       - airflow.providers.google.cloud.hooks.dataplex
   - integration-name: Google Dataprep
@@ -788,7 +786,7 @@ hooks:
   - integration-name: Google Dataproc Metastore
     python-modules:
       - airflow.providers.google.cloud.hooks.dataproc_metastore
-  - integration-name: Google Dataproc
+  - integration-name: Google Managed Service for Apache Spark
     python-modules:
       - airflow.providers.google.cloud.hooks.dataproc
   - integration-name: Google Datastore
@@ -900,7 +898,7 @@ hooks:
       - airflow.providers.google.cloud.hooks.vertex_ai.prediction_service
       - airflow.providers.google.cloud.hooks.vertex_ai.feature_store
       - airflow.providers.google.cloud.hooks.vertex_ai.ray
-  - integration-name: Google Looker
+  - integration-name: Google Data Studio
     python-modules:
       - airflow.providers.google.cloud.hooks.looker
   - integration-name: Google Cloud Dataform
@@ -958,10 +956,10 @@ triggers:
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.triggers.datafusion
-  - integration-name: Google Dataplex
+  - integration-name: Google Knowledge Catalog
     python-modules:
       - airflow.providers.google.cloud.triggers.dataplex
-  - integration-name: Google Dataproc
+  - integration-name: Google Managed Service for Apache Spark
     python-modules:
       - airflow.providers.google.cloud.triggers.dataproc
   - integration-name: Google Cloud Storage (GCS)
@@ -1331,7 +1329,7 @@ connection-types:
           type: ["string", "null"]
           format: password
   - hook-class-name: airflow.providers.google.cloud.hooks.looker.LookerHook
-    hook-name: "Google Looker"
+    hook-name: "Google Data Studio"
     connection-type: gcp_looker
 
 extra-links:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/knowledge_catalog.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/knowledge_catalog.py
new file mode 100644
index 00000000000..a6b34177dc3
--- /dev/null
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/knowledge_catalog.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TypeAlias
+
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateAspectTypeOperator,
+    DataplexCatalogCreateEntryGroupOperator,
+    DataplexCatalogCreateEntryOperator,
+    DataplexCatalogCreateEntryTypeOperator,
+    DataplexCatalogDeleteAspectTypeOperator,
+    DataplexCatalogDeleteEntryGroupOperator,
+    DataplexCatalogDeleteEntryOperator,
+    DataplexCatalogDeleteEntryTypeOperator,
+    DataplexCatalogGetAspectTypeOperator,
+    DataplexCatalogGetEntryGroupOperator,
+    DataplexCatalogGetEntryOperator,
+    DataplexCatalogGetEntryTypeOperator,
+    DataplexCatalogListAspectTypesOperator,
+    DataplexCatalogListEntriesOperator,
+    DataplexCatalogListEntryGroupsOperator,
+    DataplexCatalogListEntryTypesOperator,
+    DataplexCatalogLookupEntryOperator,
+    DataplexCatalogSearchEntriesOperator,
+    DataplexCatalogUpdateAspectTypeOperator,
+    DataplexCatalogUpdateEntryGroupOperator,
+    DataplexCatalogUpdateEntryOperator,
+    DataplexCatalogUpdateEntryTypeOperator,
+    DataplexCreateAssetOperator,
+    DataplexCreateLakeOperator,
+    DataplexCreateOrUpdateDataProfileScanOperator,
+    DataplexCreateOrUpdateDataQualityScanOperator,
+    DataplexCreateTaskOperator,
+    DataplexCreateZoneOperator,
+    DataplexDeleteAssetOperator,
+    DataplexDeleteDataProfileScanOperator,
+    DataplexDeleteDataQualityScanOperator,
+    DataplexDeleteLakeOperator,
+    DataplexDeleteTaskOperator,
+    DataplexDeleteZoneOperator,
+    DataplexGetDataProfileScanOperator,
+    DataplexGetDataProfileScanResultOperator,
+    DataplexGetDataQualityScanOperator,
+    DataplexGetDataQualityScanResultOperator,
+    DataplexGetTaskOperator,
+    DataplexListTasksOperator,
+    DataplexRunDataProfileScanOperator,
+    DataplexRunDataQualityScanOperator,
+)
+
+KnowledgeCatalogCreateTaskOperator: TypeAlias = DataplexCreateTaskOperator
+KnowledgeCatalogDeleteTaskOperator: TypeAlias = DataplexDeleteTaskOperator
+KnowledgeCatalogListTasksOperator: TypeAlias = DataplexListTasksOperator
+KnowledgeCatalogGetTaskOperator: TypeAlias = DataplexGetTaskOperator
+KnowledgeCatalogCreateLakeOperator: TypeAlias = DataplexCreateLakeOperator
+KnowledgeCatalogDeleteLakeOperator: TypeAlias = DataplexDeleteLakeOperator
+KnowledgeCatalogCreateOrUpdateDataQualityScanOperator: TypeAlias = (
+    DataplexCreateOrUpdateDataQualityScanOperator
+)
+KnowledgeCatalogGetDataQualityScanOperator: TypeAlias = 
DataplexGetDataQualityScanOperator
+KnowledgeCatalogDeleteDataQualityScanOperator: TypeAlias = 
DataplexDeleteDataQualityScanOperator
+KnowledgeCatalogRunDataQualityScanOperator: TypeAlias = 
DataplexRunDataQualityScanOperator
+KnowledgeCatalogGetDataQualityScanResultOperator: TypeAlias = 
DataplexGetDataQualityScanResultOperator
+KnowledgeCatalogCreateOrUpdateDataProfileScanOperator: TypeAlias = (
+    DataplexCreateOrUpdateDataProfileScanOperator
+)
+KnowledgeCatalogGetDataProfileScanOperator: TypeAlias = 
DataplexGetDataProfileScanOperator
+KnowledgeCatalogDeleteDataProfileScanOperator: TypeAlias = 
DataplexDeleteDataProfileScanOperator
+KnowledgeCatalogRunDataProfileScanOperator: TypeAlias = 
DataplexRunDataProfileScanOperator
+KnowledgeCatalogGetDataProfileScanResultOperator: TypeAlias = 
DataplexGetDataProfileScanResultOperator
+KnowledgeCatalogCreateZoneOperator: TypeAlias = DataplexCreateZoneOperator
+KnowledgeCatalogDeleteZoneOperator: TypeAlias = DataplexDeleteZoneOperator
+KnowledgeCatalogCreateAssetOperator: TypeAlias = DataplexCreateAssetOperator
+KnowledgeCatalogDeleteAssetOperator: TypeAlias = DataplexDeleteAssetOperator
+KnowledgeCatalogCreateEntryGroupOperator: TypeAlias = 
DataplexCatalogCreateEntryGroupOperator
+KnowledgeCatalogGetEntryGroupOperator: TypeAlias = 
DataplexCatalogGetEntryGroupOperator
+KnowledgeCatalogDeleteEntryGroupOperator: TypeAlias = 
DataplexCatalogDeleteEntryGroupOperator
+KnowledgeCatalogListEntryGroupsOperator: TypeAlias = 
DataplexCatalogListEntryGroupsOperator
+KnowledgeCatalogUpdateEntryGroupOperator: TypeAlias = 
DataplexCatalogUpdateEntryGroupOperator
+KnowledgeCatalogCreateEntryTypeOperator: TypeAlias = 
DataplexCatalogCreateEntryTypeOperator
+KnowledgeCatalogGetEntryTypeOperator: TypeAlias = 
DataplexCatalogGetEntryTypeOperator
+KnowledgeCatalogDeleteEntryTypeOperator: TypeAlias = 
DataplexCatalogDeleteEntryTypeOperator
+KnowledgeCatalogListEntryTypesOperator: TypeAlias = 
DataplexCatalogListEntryTypesOperator
+KnowledgeCatalogUpdateEntryTypeOperator: TypeAlias = 
DataplexCatalogUpdateEntryTypeOperator
+KnowledgeCatalogCreateAspectTypeOperator: TypeAlias = 
DataplexCatalogCreateAspectTypeOperator
+KnowledgeCatalogGetAspectTypeOperator: TypeAlias = 
DataplexCatalogGetAspectTypeOperator
+KnowledgeCatalogListAspectTypesOperator: TypeAlias = 
DataplexCatalogListAspectTypesOperator
+KnowledgeCatalogUpdateAspectTypeOperator: TypeAlias = 
DataplexCatalogUpdateAspectTypeOperator
+KnowledgeCatalogDeleteAspectTypeOperator: TypeAlias = 
DataplexCatalogDeleteAspectTypeOperator
+KnowledgeCatalogCreateEntryOperator: TypeAlias = 
DataplexCatalogCreateEntryOperator
+KnowledgeCatalogGetEntryOperator: TypeAlias = DataplexCatalogGetEntryOperator
+KnowledgeCatalogListEntriesOperator: TypeAlias = 
DataplexCatalogListEntriesOperator
+KnowledgeCatalogSearchEntriesOperator: TypeAlias = 
DataplexCatalogSearchEntriesOperator
+KnowledgeCatalogLookupEntryOperator: TypeAlias = 
DataplexCatalogLookupEntryOperator
+KnowledgeCatalogUpdateEntryOperator: TypeAlias = 
DataplexCatalogUpdateEntryOperator
+KnowledgeCatalogDeleteEntryOperator: TypeAlias = 
DataplexCatalogDeleteEntryOperator
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/looker.py 
b/providers/google/src/airflow/providers/google/cloud/operators/looker.py
index 775dfed31c8..71e1e6897d1 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/looker.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/looker.py
@@ -15,11 +15,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""This module contains Google Cloud Looker operators."""
+"""This module contains Google Cloud Looker operators and Data Studio 
aliases."""
 
 from __future__ import annotations
 
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, TypeAlias
 
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.google.cloud.hooks.looker import LookerHook
@@ -103,3 +103,6 @@ class LookerStartPdtBuildOperator(GoogleCloudBaseOperator):
     def on_kill(self):
         if self.materialization_id and self.cancel_on_kill:
             
self.hook.stop_pdt_build(materialization_id=self.materialization_id)
+
+
+DataStudioStartPdtBuildOperator: TypeAlias = LookerStartPdtBuildOperator
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/managed_spark.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/managed_spark.py
new file mode 100644
index 00000000000..6b21ffa69bd
--- /dev/null
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/managed_spark.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TypeAlias
+
+from airflow.providers.google.cloud.operators.dataproc import (
+    DataprocCancelOperationOperator,
+    DataprocCreateBatchOperator,
+    DataprocCreateClusterOperator,
+    DataprocCreateWorkflowTemplateOperator,
+    DataprocDeleteBatchOperator,
+    DataprocDeleteClusterOperator,
+    DataprocDiagnoseClusterOperator,
+    DataprocGetBatchOperator,
+    DataprocInstantiateInlineWorkflowTemplateOperator,
+    DataprocInstantiateWorkflowTemplateOperator,
+    DataprocListBatchesOperator,
+    DataprocStartClusterOperator,
+    DataprocStopClusterOperator,
+    DataprocSubmitJobOperator,
+    DataprocUpdateClusterOperator,
+)
+
+ManagedSparkCreateClusterOperator: TypeAlias = DataprocCreateClusterOperator
+ManagedSparkDeleteClusterOperator: TypeAlias = DataprocDeleteClusterOperator
+ManagedSparkStartClusterOperator: TypeAlias = DataprocStartClusterOperator
+ManagedSparkStopClusterOperator: TypeAlias = DataprocStopClusterOperator
+ManagedSparkCreateWorkflowTemplateOperator: TypeAlias = 
DataprocCreateWorkflowTemplateOperator
+ManagedSparkInstantiateWorkflowTemplateOperator: TypeAlias = 
DataprocInstantiateWorkflowTemplateOperator
+ManagedSparkInstantiateInlineWorkflowTemplateOperator: TypeAlias = (
+    DataprocInstantiateInlineWorkflowTemplateOperator
+)
+ManagedSparkSubmitJobOperator: TypeAlias = DataprocSubmitJobOperator
+ManagedSparkUpdateClusterOperator: TypeAlias = DataprocUpdateClusterOperator
+ManagedSparkDiagnoseClusterOperator: TypeAlias = 
DataprocDiagnoseClusterOperator
+ManagedSparkCreateBatchOperator: TypeAlias = DataprocCreateBatchOperator
+ManagedSparkDeleteBatchOperator: TypeAlias = DataprocDeleteBatchOperator
+ManagedSparkGetBatchOperator: TypeAlias = DataprocGetBatchOperator
+ManagedSparkListBatchesOperator: TypeAlias = DataprocListBatchesOperator
+ManagedSparkCancelOperationOperator: TypeAlias = 
DataprocCancelOperationOperator
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index c172eee6e87..ec6f630a162 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -265,12 +265,6 @@ def get_provider_info():
                 "logo": "/docs/integration-logos/Compute-Engine.png",
                 "tags": ["gcp"],
             },
-            {
-                "integration-name": "Google Data Proc",
-                "external-doc-url": "https://cloud.google.com/dataproc/";,
-                "logo": "/docs/integration-logos/Google-Data-Proc.png",
-                "tags": ["gcp"],
-            },
             {
                 "integration-name": "Google Dataflow",
                 "external-doc-url": "https://cloud.google.com/dataflow/";,
@@ -286,7 +280,7 @@ def get_provider_info():
                 "tags": ["gcp"],
             },
             {
-                "integration-name": "Google Dataplex",
+                "integration-name": "Google Knowledge Catalog",
                 "external-doc-url": "https://cloud.google.com/dataplex/";,
                 "how-to-guide": 
["/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst"],
                 "tags": ["gcp"],
@@ -307,7 +301,7 @@ def get_provider_info():
                 "tags": ["gcp"],
             },
             {
-                "integration-name": "Google Dataproc",
+                "integration-name": "Google Managed Service for Apache Spark",
                 "external-doc-url": "https://cloud.google.com/dataproc/";,
                 "how-to-guide": 
["/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst"],
                 "logo": "/docs/integration-logos/Cloud-Dataproc.png",
@@ -442,7 +436,7 @@ def get_provider_info():
                 "tags": ["gcp"],
             },
             {
-                "integration-name": "Google Looker",
+                "integration-name": "Google Data Studio",
                 "external-doc-url": "https://cloud.google.com/looker";,
                 "logo": "/docs/integration-logos/Cloud-Looker.png",
                 "how-to-guide": 
["/docs/apache-airflow-providers-google/operators/cloud/looker.rst"],
@@ -544,8 +538,11 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.cloud.operators.datafusion"],
             },
             {
-                "integration-name": "Google Dataplex",
-                "python-modules": 
["airflow.providers.google.cloud.operators.dataplex"],
+                "integration-name": "Google Knowledge Catalog",
+                "python-modules": [
+                    "airflow.providers.google.cloud.operators.dataplex",
+                    
"airflow.providers.google.cloud.operators.knowledge_catalog",
+                ],
             },
             {
                 "integration-name": "Google Dataprep",
@@ -556,8 +553,11 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.cloud.operators.dataproc_metastore"],
             },
             {
-                "integration-name": "Google Dataproc",
-                "python-modules": 
["airflow.providers.google.cloud.operators.dataproc"],
+                "integration-name": "Google Managed Service for Apache Spark",
+                "python-modules": [
+                    "airflow.providers.google.cloud.operators.dataproc",
+                    "airflow.providers.google.cloud.operators.managed_spark",
+                ],
             },
             {
                 "integration-name": "Google Datastore",
@@ -675,7 +675,7 @@ def get_provider_info():
                 ],
             },
             {
-                "integration-name": "Google Looker",
+                "integration-name": "Google Data Studio",
                 "python-modules": 
["airflow.providers.google.cloud.operators.looker"],
             },
             {
@@ -741,11 +741,11 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.cloud.sensors.dataprep"],
             },
             {
-                "integration-name": "Google Dataplex",
+                "integration-name": "Google Knowledge Catalog",
                 "python-modules": 
["airflow.providers.google.cloud.sensors.dataplex"],
             },
             {
-                "integration-name": "Google Dataproc",
+                "integration-name": "Google Managed Service for Apache Spark",
                 "python-modules": 
["airflow.providers.google.cloud.sensors.dataproc"],
             },
             {
@@ -781,7 +781,7 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.marketing_platform.sensors.display_video"],
             },
             {
-                "integration-name": "Google Looker",
+                "integration-name": "Google Data Studio",
                 "python-modules": 
["airflow.providers.google.cloud.sensors.looker"],
             },
             {
@@ -886,7 +886,7 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.cloud.hooks.datafusion"],
             },
             {
-                "integration-name": "Google Dataplex",
+                "integration-name": "Google Knowledge Catalog",
                 "python-modules": 
["airflow.providers.google.cloud.hooks.dataplex"],
             },
             {
@@ -898,7 +898,7 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.cloud.hooks.dataproc_metastore"],
             },
             {
-                "integration-name": "Google Dataproc",
+                "integration-name": "Google Managed Service for Apache Spark",
                 "python-modules": 
["airflow.providers.google.cloud.hooks.dataproc"],
             },
             {
@@ -1047,7 +1047,7 @@ def get_provider_info():
                 ],
             },
             {
-                "integration-name": "Google Looker",
+                "integration-name": "Google Data Studio",
                 "python-modules": 
["airflow.providers.google.cloud.hooks.looker"],
             },
             {
@@ -1123,11 +1123,11 @@ def get_provider_info():
                 "python-modules": 
["airflow.providers.google.cloud.triggers.datafusion"],
             },
             {
-                "integration-name": "Google Dataplex",
+                "integration-name": "Google Knowledge Catalog",
                 "python-modules": 
["airflow.providers.google.cloud.triggers.dataplex"],
             },
             {
-                "integration-name": "Google Dataproc",
+                "integration-name": "Google Managed Service for Apache Spark",
                 "python-modules": 
["airflow.providers.google.cloud.triggers.dataproc"],
             },
             {
@@ -1556,7 +1556,7 @@ def get_provider_info():
             },
             {
                 "hook-class-name": 
"airflow.providers.google.cloud.hooks.looker.LookerHook",
-                "hook-name": "Google Looker",
+                "hook-name": "Google Data Studio",
                 "connection-type": "gcp_looker",
             },
         ],
diff --git 
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex.py 
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex.py
index efe6d6c8485..0982620b86f 100644
--- a/providers/google/tests/system/google/cloud/dataplex/example_dataplex.py
+++ b/providers/google/tests/system/google/cloud/dataplex/example_dataplex.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that shows how to use Dataplex.
+Example Airflow DAG that shows how to use Knowledge Catalog.
 """
 
 from __future__ import annotations
@@ -89,7 +89,7 @@ with DAG(
     DAG_ID,
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@once",
-    tags=["example", "dataplex"],
+    tags=["example", "knowledge-catalog"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
 
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
index 5e1539cb807..3dad30b30e8 100644
--- 
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+++ 
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that shows how to use Dataplex Catalog.
+Example Airflow DAG that shows how to use Knowledge Catalog Catalog.
 """
 
 from __future__ import annotations
diff --git 
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py 
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
index c88a576ba1b..5d315b67ffc 100644
--- a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
+++ b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dp.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that shows how to use Dataplex Scan Data.
+Example Airflow DAG that shows how to use Knowledge Catalog Scan Data.
 """
 
 from __future__ import annotations
@@ -134,7 +134,7 @@ with DAG(
     DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule="@once",
-    tags=["example", "dataplex", "data_profile"],
+    tags=["example", "knowledge-catalog", "data_profile"],
 ) as dag:
     create_dataset = 
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET)
     create_table_1 = BigQueryCreateTableOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py 
b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
index 0d9956a9e46..b35de66a52f 100644
--- a/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
+++ b/providers/google/tests/system/google/cloud/dataplex/example_dataplex_dq.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that shows how to use Dataplex Scan Data.
+Example Airflow DAG that shows how to use Knowledge Catalog Scan Data.
 """
 
 from __future__ import annotations
@@ -158,7 +158,7 @@ with DAG(
     DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule="@once",
-    tags=["example", "dataplex", "data_quality"],
+    tags=["example", "knowledge-catalog", "data_quality"],
 ) as dag:
     create_dataset = 
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET)
     create_table_1 = BigQueryCreateTableOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
index 9f87cbc91e9..a3fdb3c7dfa 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG for Dataproc batch operators.
+Example Airflow DAG for Managed Spark batch operators.
 """
 
 from __future__ import annotations
@@ -66,7 +66,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "managed-spark"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_batch_operator]
     create_batch = DataprocCreateBatchOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py
index 0c8ee3cbe4c..444f90edb38 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py
@@ -60,7 +60,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "batch", "deferrable"],
+    tags=["example", "managed-spark", "batch", "deferrable"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_batch_operator_async]
     create_batch = DataprocCreateBatchOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
index 59a55075ffe..c13f4e0062c 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG for Dataproc batch operators.
+Example Airflow DAG for Managed Spark batch operators.
 """
 
 from __future__ import annotations
@@ -84,7 +84,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "batch", "persistent"],
+    tags=["example", "managed-spark", "batch", "persistent"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py
index 121c2f2c52e..a9bb93be98e 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py
@@ -68,7 +68,11 @@ CLUSTER_CONFIG = {
 }
 
 with DAG(
-    DAG_ID, schedule="@once", start_date=datetime(2024, 1, 1), catchup=False, 
tags=["dataproc", "example"]
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["managed-spark", "example"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
index a77e15723c2..94fd0b2264d 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
@@ -93,7 +93,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "deferrable"],
+    tags=["example", "managed-spark", "deferrable"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_cluster_operator_async]
     create_cluster = DataprocCreateClusterOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
index 91d688c5c36..d1bb17ad4da 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
@@ -72,7 +72,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "diagnose", "cluster"],
+    tags=["example", "managed-spark", "diagnose", "cluster"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py
index c83190d2444..d7c639a1af0 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG testing Dataproc
+Example Airflow DAG testing Managed Spark
 operators for managing a cluster and submitting jobs.
 """
 
@@ -92,7 +92,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "managed-spark"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
index 0a8b28b18b8..e21f0195c2c 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
@@ -67,7 +67,11 @@ CLUSTER_CONFIG = {
 }
 
 with DAG(
-    DAG_ID, schedule="@once", start_date=datetime(2024, 1, 1), catchup=False, 
tags=["dataproc", "example"]
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["managed-spark", "example"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
index 3747c9f206d..a9961901b84 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
@@ -83,7 +83,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "managed-spark"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_flink.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_flink.py
index d852ec5c1ad..fc95751fe45 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_flink.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_flink.py
@@ -88,7 +88,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "hadoop"],
+    tags=["example", "managed-spark", "hadoop"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_gke.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_gke.py
index 560c4c63c48..cabf8a517f0 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_gke.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_gke.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that show how to create a Dataproc cluster in Google 
Kubernetes Engine.
+Example Airflow DAG that show how to create a Managed Spark cluster in Google 
Kubernetes Engine.
 
 Required environment variables:
 GKE_NAMESPACE = os.environ.get("GKE_NAMESPACE", f"{CLUSTER_NAME}")
@@ -102,7 +102,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "gke"],
+    tags=["example", "managed-spark", "gke"],
 ) as dag:
     create_gke_cluster = GKECreateClusterOperator(
         task_id="create_gke_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hadoop.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hadoop.py
index a3368a7bd75..0b20a54f190 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hadoop.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hadoop.py
@@ -87,7 +87,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "hadoop"],
+    tags=["example", "managed-spark", "hadoop"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hive.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hive.py
index 2a49be57411..83d7ae73567 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hive.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_hive.py
@@ -95,7 +95,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "hive"],
+    tags=["example", "managed-spark", "hive"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_cluster_operator]
     create_cluster = DataprocCreateClusterOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pig.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pig.py
index 712681cdea6..fc89a5f5e36 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pig.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pig.py
@@ -83,7 +83,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "pig"],
+    tags=["example", "managed-spark", "pig"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_presto.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_presto.py
index 377f4aafbc8..71ea5d653ba 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_presto.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_presto.py
@@ -87,7 +87,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "presto"],
+    tags=["example", "managed-spark", "presto"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
index 1024c888707..c6c898bbb00 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
@@ -93,7 +93,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "pyspark"],
+    tags=["example", "managed-spark", "pyspark"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark.py
index 03d963d5d17..631a25f1064 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark.py
@@ -83,7 +83,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "spark"],
+    tags=["example", "managed-spark", "spark"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_async.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_async.py
index 32bc96b0322..ddf9554cae8 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_async.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_async.py
@@ -82,7 +82,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "spark", "async"],
+    tags=["example", "managed-spark", "spark", "async"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py
index 3f0433be201..cb19117b7a3 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py
@@ -84,7 +84,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "spark", "deferrable"],
+    tags=["example", "managed-spark", "spark", "deferrable"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py
index a1a60059b4d..26fdbe1e102 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py
@@ -80,7 +80,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "managed-spark"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_sparkr.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_sparkr.py
index dc2ff3c580f..8c4c7270b23 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_sparkr.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_sparkr.py
@@ -91,7 +91,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "sparkr"],
+    tags=["example", "managed-spark", "sparkr"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_start_from_trigger.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_start_from_trigger.py
index 23f1d93cd9e..6071854f2d3 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_start_from_trigger.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_start_from_trigger.py
@@ -81,7 +81,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "start_from_trigger"],
+    tags=["example", "managed-spark", "start_from_trigger"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_trino.py 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_trino.py
index 2cc32ea52d9..3bbbbf089f2 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_trino.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_trino.py
@@ -89,7 +89,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "trino"],
+    tags=["example", "managed-spark", "trino"],
 ) as dag:
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
index a0a578ec337..7ba18427afd 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG for Dataproc workflow operators.
+Example Airflow DAG for Managed Spark workflow operators.
 """
 
 from __future__ import annotations
@@ -72,7 +72,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "workflow"],
+    tags=["example", "managed-spark", "workflow"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_workflow_template]
     create_workflow_template = DataprocCreateWorkflowTemplateOperator(
diff --git 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
index 5a69ca4a80e..8c34ba4bfe4 100644
--- 
a/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
+++ 
b/providers/google/tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG for Dataproc workflow operators.
+Example Airflow DAG for Managed Spark workflow operators.
 """
 
 from __future__ import annotations
@@ -71,7 +71,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc", "workflow", "deferrable"],
+    tags=["example", "managed-spark", "workflow", "deferrable"],
 ) as dag:
     create_workflow_template = DataprocCreateWorkflowTemplateOperator(
         task_id="create_workflow_template",
diff --git 
a/providers/google/tests/system/google/cloud/looker/example_looker.py 
b/providers/google/tests/system/google/cloud/looker/example_looker.py
index de746effb6c..32287b3a32c 100644
--- a/providers/google/tests/system/google/cloud/looker/example_looker.py
+++ b/providers/google/tests/system/google/cloud/looker/example_looker.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that show how to use various Looker
+Example Airflow DAG that show how to use various Data Studio
 operators to submit PDT materialization job and manage it.
 """
 
@@ -38,7 +38,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "looker"],
+    tags=["example", "data-studio"],
 ) as dag:
     # [START cloud_looker_async_start_pdt_sensor]
     start_pdt_task_async = LookerStartPdtBuildOperator(
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py 
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index 164580b6c7f..ecaeae1ca3b 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -85,6 +85,7 @@ cluster_params = 
inspect.signature(ClusterGenerator.__init__).parameters
 DATAPROC_PATH = "airflow.providers.google.cloud.operators.dataproc.{}"
 DATAPROC_TRIGGERS_PATH = "airflow.providers.google.cloud.triggers.dataproc.{}"
 
+
 TASK_ID = "task-id"
 GCP_PROJECT = "test-project"
 GCP_REGION = "test-location"
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_knowledge_catalog.py 
b/providers/google/tests/unit/google/cloud/operators/test_knowledge_catalog.py
new file mode 100644
index 00000000000..a1636554197
--- /dev/null
+++ 
b/providers/google/tests/unit/google/cloud/operators/test_knowledge_catalog.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateAspectTypeOperator,
+    DataplexCatalogCreateEntryGroupOperator,
+    DataplexCatalogCreateEntryOperator,
+    DataplexCatalogCreateEntryTypeOperator,
+    DataplexCatalogDeleteAspectTypeOperator,
+    DataplexCatalogDeleteEntryGroupOperator,
+    DataplexCatalogDeleteEntryOperator,
+    DataplexCatalogDeleteEntryTypeOperator,
+    DataplexCatalogGetAspectTypeOperator,
+    DataplexCatalogGetEntryGroupOperator,
+    DataplexCatalogGetEntryOperator,
+    DataplexCatalogGetEntryTypeOperator,
+    DataplexCatalogListAspectTypesOperator,
+    DataplexCatalogListEntriesOperator,
+    DataplexCatalogListEntryGroupsOperator,
+    DataplexCatalogListEntryTypesOperator,
+    DataplexCatalogLookupEntryOperator,
+    DataplexCatalogSearchEntriesOperator,
+    DataplexCatalogUpdateAspectTypeOperator,
+    DataplexCatalogUpdateEntryGroupOperator,
+    DataplexCatalogUpdateEntryOperator,
+    DataplexCatalogUpdateEntryTypeOperator,
+    DataplexCreateAssetOperator,
+    DataplexCreateLakeOperator,
+    DataplexCreateOrUpdateDataProfileScanOperator,
+    DataplexCreateOrUpdateDataQualityScanOperator,
+    DataplexCreateTaskOperator,
+    DataplexCreateZoneOperator,
+    DataplexDeleteAssetOperator,
+    DataplexDeleteDataProfileScanOperator,
+    DataplexDeleteDataQualityScanOperator,
+    DataplexDeleteLakeOperator,
+    DataplexDeleteTaskOperator,
+    DataplexDeleteZoneOperator,
+    DataplexGetDataProfileScanOperator,
+    DataplexGetDataProfileScanResultOperator,
+    DataplexGetDataQualityScanOperator,
+    DataplexGetDataQualityScanResultOperator,
+    DataplexGetTaskOperator,
+    DataplexListTasksOperator,
+    DataplexRunDataProfileScanOperator,
+    DataplexRunDataQualityScanOperator,
+)
+from airflow.providers.google.cloud.operators.knowledge_catalog import (
+    KnowledgeCatalogCreateAspectTypeOperator,
+    KnowledgeCatalogCreateAssetOperator,
+    KnowledgeCatalogCreateEntryGroupOperator,
+    KnowledgeCatalogCreateEntryOperator,
+    KnowledgeCatalogCreateEntryTypeOperator,
+    KnowledgeCatalogCreateLakeOperator,
+    KnowledgeCatalogCreateOrUpdateDataProfileScanOperator,
+    KnowledgeCatalogCreateOrUpdateDataQualityScanOperator,
+    KnowledgeCatalogCreateTaskOperator,
+    KnowledgeCatalogCreateZoneOperator,
+    KnowledgeCatalogDeleteAspectTypeOperator,
+    KnowledgeCatalogDeleteAssetOperator,
+    KnowledgeCatalogDeleteDataProfileScanOperator,
+    KnowledgeCatalogDeleteDataQualityScanOperator,
+    KnowledgeCatalogDeleteEntryGroupOperator,
+    KnowledgeCatalogDeleteEntryOperator,
+    KnowledgeCatalogDeleteEntryTypeOperator,
+    KnowledgeCatalogDeleteLakeOperator,
+    KnowledgeCatalogDeleteTaskOperator,
+    KnowledgeCatalogDeleteZoneOperator,
+    KnowledgeCatalogGetAspectTypeOperator,
+    KnowledgeCatalogGetDataProfileScanOperator,
+    KnowledgeCatalogGetDataProfileScanResultOperator,
+    KnowledgeCatalogGetDataQualityScanOperator,
+    KnowledgeCatalogGetDataQualityScanResultOperator,
+    KnowledgeCatalogGetEntryGroupOperator,
+    KnowledgeCatalogGetEntryOperator,
+    KnowledgeCatalogGetEntryTypeOperator,
+    KnowledgeCatalogGetTaskOperator,
+    KnowledgeCatalogListAspectTypesOperator,
+    KnowledgeCatalogListEntriesOperator,
+    KnowledgeCatalogListEntryGroupsOperator,
+    KnowledgeCatalogListEntryTypesOperator,
+    KnowledgeCatalogListTasksOperator,
+    KnowledgeCatalogLookupEntryOperator,
+    KnowledgeCatalogRunDataProfileScanOperator,
+    KnowledgeCatalogRunDataQualityScanOperator,
+    KnowledgeCatalogSearchEntriesOperator,
+    KnowledgeCatalogUpdateAspectTypeOperator,
+    KnowledgeCatalogUpdateEntryGroupOperator,
+    KnowledgeCatalogUpdateEntryOperator,
+    KnowledgeCatalogUpdateEntryTypeOperator,
+)
+
+
[email protected](
+    ("alias_operator", "original_operator"),
+    [
+        (KnowledgeCatalogCreateTaskOperator, DataplexCreateTaskOperator),
+        (KnowledgeCatalogDeleteTaskOperator, DataplexDeleteTaskOperator),
+        (KnowledgeCatalogListTasksOperator, DataplexListTasksOperator),
+        (KnowledgeCatalogGetTaskOperator, DataplexGetTaskOperator),
+        (KnowledgeCatalogCreateLakeOperator, DataplexCreateLakeOperator),
+        (KnowledgeCatalogDeleteLakeOperator, DataplexDeleteLakeOperator),
+        (
+            KnowledgeCatalogCreateOrUpdateDataQualityScanOperator,
+            DataplexCreateOrUpdateDataQualityScanOperator,
+        ),
+        (KnowledgeCatalogGetDataQualityScanOperator, 
DataplexGetDataQualityScanOperator),
+        (KnowledgeCatalogDeleteDataQualityScanOperator, 
DataplexDeleteDataQualityScanOperator),
+        (KnowledgeCatalogRunDataQualityScanOperator, 
DataplexRunDataQualityScanOperator),
+        (
+            KnowledgeCatalogGetDataQualityScanResultOperator,
+            DataplexGetDataQualityScanResultOperator,
+        ),
+        (
+            KnowledgeCatalogCreateOrUpdateDataProfileScanOperator,
+            DataplexCreateOrUpdateDataProfileScanOperator,
+        ),
+        (KnowledgeCatalogGetDataProfileScanOperator, 
DataplexGetDataProfileScanOperator),
+        (KnowledgeCatalogDeleteDataProfileScanOperator, 
DataplexDeleteDataProfileScanOperator),
+        (KnowledgeCatalogRunDataProfileScanOperator, 
DataplexRunDataProfileScanOperator),
+        (
+            KnowledgeCatalogGetDataProfileScanResultOperator,
+            DataplexGetDataProfileScanResultOperator,
+        ),
+        (KnowledgeCatalogCreateZoneOperator, DataplexCreateZoneOperator),
+        (KnowledgeCatalogDeleteZoneOperator, DataplexDeleteZoneOperator),
+        (KnowledgeCatalogCreateAssetOperator, DataplexCreateAssetOperator),
+        (KnowledgeCatalogDeleteAssetOperator, DataplexDeleteAssetOperator),
+        (KnowledgeCatalogCreateEntryGroupOperator, 
DataplexCatalogCreateEntryGroupOperator),
+        (KnowledgeCatalogGetEntryGroupOperator, 
DataplexCatalogGetEntryGroupOperator),
+        (KnowledgeCatalogDeleteEntryGroupOperator, 
DataplexCatalogDeleteEntryGroupOperator),
+        (KnowledgeCatalogListEntryGroupsOperator, 
DataplexCatalogListEntryGroupsOperator),
+        (KnowledgeCatalogUpdateEntryGroupOperator, 
DataplexCatalogUpdateEntryGroupOperator),
+        (KnowledgeCatalogCreateEntryTypeOperator, 
DataplexCatalogCreateEntryTypeOperator),
+        (KnowledgeCatalogGetEntryTypeOperator, 
DataplexCatalogGetEntryTypeOperator),
+        (KnowledgeCatalogDeleteEntryTypeOperator, 
DataplexCatalogDeleteEntryTypeOperator),
+        (KnowledgeCatalogListEntryTypesOperator, 
DataplexCatalogListEntryTypesOperator),
+        (KnowledgeCatalogUpdateEntryTypeOperator, 
DataplexCatalogUpdateEntryTypeOperator),
+        (KnowledgeCatalogCreateAspectTypeOperator, 
DataplexCatalogCreateAspectTypeOperator),
+        (KnowledgeCatalogGetAspectTypeOperator, 
DataplexCatalogGetAspectTypeOperator),
+        (KnowledgeCatalogListAspectTypesOperator, 
DataplexCatalogListAspectTypesOperator),
+        (KnowledgeCatalogUpdateAspectTypeOperator, 
DataplexCatalogUpdateAspectTypeOperator),
+        (KnowledgeCatalogDeleteAspectTypeOperator, 
DataplexCatalogDeleteAspectTypeOperator),
+        (KnowledgeCatalogCreateEntryOperator, 
DataplexCatalogCreateEntryOperator),
+        (KnowledgeCatalogGetEntryOperator, DataplexCatalogGetEntryOperator),
+        (KnowledgeCatalogListEntriesOperator, 
DataplexCatalogListEntriesOperator),
+        (KnowledgeCatalogSearchEntriesOperator, 
DataplexCatalogSearchEntriesOperator),
+        (KnowledgeCatalogLookupEntryOperator, 
DataplexCatalogLookupEntryOperator),
+        (KnowledgeCatalogUpdateEntryOperator, 
DataplexCatalogUpdateEntryOperator),
+        (KnowledgeCatalogDeleteEntryOperator, 
DataplexCatalogDeleteEntryOperator),
+    ],
+)
+def test_knowledge_catalog_aliases(alias_operator, original_operator):
+    assert alias_operator is original_operator
diff --git a/providers/google/tests/unit/google/cloud/operators/test_looker.py 
b/providers/google/tests/unit/google/cloud/operators/test_looker.py
index 91f2387a64a..6985fd6ec77 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_looker.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_looker.py
@@ -23,7 +23,10 @@ import pytest
 
 from airflow.models import DAG, DagBag
 from airflow.providers.common.compat.sdk import AirflowException
-from airflow.providers.google.cloud.operators.looker import 
LookerStartPdtBuildOperator
+from airflow.providers.google.cloud.operators.looker import (
+    DataStudioStartPdtBuildOperator,
+    LookerStartPdtBuildOperator,
+)
 from airflow.utils.timezone import datetime
 
 from tests_common.test_utils.db import clear_db_runs, clear_db_xcom
@@ -40,6 +43,10 @@ DEFAULT_DATE = datetime(2020, 1, 1)
 TEST_JOB_ID = "123"
 
 
+def test_data_studio_aliases():
+    assert DataStudioStartPdtBuildOperator is LookerStartPdtBuildOperator
+
+
 class LookerTestBase:
     @classmethod
     def setUpClass(cls):
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_managed_spark.py 
b/providers/google/tests/unit/google/cloud/operators/test_managed_spark.py
new file mode 100644
index 00000000000..a1296125db2
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/operators/test_managed_spark.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.google.cloud.operators.dataproc import (
+    DataprocCreateBatchOperator,
+    DataprocCreateClusterOperator,
+    DataprocDeleteClusterOperator,
+    DataprocSubmitJobOperator,
+)
+from airflow.providers.google.cloud.operators.managed_spark import (
+    ManagedSparkCreateBatchOperator,
+    ManagedSparkCreateClusterOperator,
+    ManagedSparkDeleteClusterOperator,
+    ManagedSparkSubmitJobOperator,
+)
+
+
+def test_managed_spark_aliases():
+    assert ManagedSparkCreateClusterOperator is DataprocCreateClusterOperator
+    assert ManagedSparkDeleteClusterOperator is DataprocDeleteClusterOperator
+    assert ManagedSparkSubmitJobOperator is DataprocSubmitJobOperator
+    assert ManagedSparkCreateBatchOperator is DataprocCreateBatchOperator

Reply via email to