This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 838e8e8acfd Add operators for Google BidManager API (#62521)
838e8e8acfd is described below

commit 838e8e8acfd668cbc726ef3776cb15a2606d0d86
Author: Nitochkin <[email protected]>
AuthorDate: Tue Mar 10 19:11:05 2026 +0100

    Add operators for Google BidManager API (#62521)
    
    Co-authored-by: Anton Nitochkin <[email protected]>
---
 .../operators/marketing_platform/bid_manager.rst   | 119 +++++++
 .../operators/marketing_platform/display_video.rst |   1 -
 providers/google/provider.yaml                     |  15 +
 .../airflow/providers/google/get_provider_info.py  |  21 ++
 .../google/marketing_platform/hooks/bid_manager.py | 126 ++++++++
 .../marketing_platform/operators/bid_manager.py    | 347 +++++++++++++++++++++
 .../sensors/{display_video.py => bid_manager.py}   |  49 ++-
 .../marketing_platform/sensors/display_video.py    |   2 +-
 .../marketing_platform/example_bid_manager.py      | 214 +++++++++++++
 .../marketing_platform/hooks/test_bid_manager.py   | 145 +++++++++
 .../operators/test_bid_manager.py                  | 257 +++++++++++++++
 .../marketing_platform/sensors/test_bid_manager.py |  44 +++
 12 files changed, 1313 insertions(+), 27 deletions(-)

diff --git a/providers/google/docs/operators/marketing_platform/bid_manager.rst 
b/providers/google/docs/operators/marketing_platform/bid_manager.rst
new file mode 100644
index 00000000000..c29e2687ed5
--- /dev/null
+++ b/providers/google/docs/operators/marketing_platform/bid_manager.rst
@@ -0,0 +1,119 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Bid Manager API Operators
+=======================================
+`Google Bid Manager API <https://developers.google.com/bid-manager/>`__ is a 
programmatic interface for the Display & Video 360 reporting feature.
+It lets users build and run report queries, and download the resulting report 
file.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GoogleBidManagerCreateQueryOperator:
+
+Creating a Query
+^^^^^^^^^^^^^^^^
+
+To create a query using Bid Manager, use
+:class:`~airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerCreateQueryOperator`.
+
+.. exampleinclude:: 
/../../google/tests/system/google/marketing_platform/example_bid_manager.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_bid_manager_create_query_operator]
+    :end-before: [END howto_google_bid_manager_create_query_operator]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerCreateQueryOperator`
+parameters which allow you to dynamically determine values. You can provide 
body definition using ``.json`` file
+as this operator supports this template extension.
+The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to 
be used by other operators.
+
+.. _howto/operator:GoogleBidManagerRunQueryOperator:
+
+Run Query
+^^^^^^^^^
+
+To run a query using Bid Manager, use
+:class:`~airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerRunQueryOperator`.
+
+.. exampleinclude:: 
/../../google/tests/system/google/marketing_platform/example_bid_manager.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_bid_manager_run_query_report_operator]
+    :end-before: [END howto_google_bid_manager_run_query_report_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerRunQueryOperator`
+parameters which allow you to dynamically determine values.
+The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to 
be used by other operators.
+
+.. _howto/operator:GoogleBidManagerDeleteQueryOperator:
+
+Deleting a Query
+^^^^^^^^^^^^^^^^
+
+To delete a query using Bid Manager, use
+:class:`~airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerDeleteQueryOperator`.
+
+.. exampleinclude:: 
/../../google/tests/system/google/marketing_platform/example_bid_manager.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_bid_manager_delete_query_operator]
+    :end-before: [END howto_google_bid_manager_delete_query_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerDeleteQueryOperator`
+parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleBidManagerRunQuerySensor:
+
+Waiting for query
+^^^^^^^^^^^^^^^^^
+
+To wait for the report use
+:class:`~airflow.providers.google.marketing_platform.sensors.bid_manager.GoogleBidManagerRunQuerySensor`.
+
+.. exampleinclude:: 
/../../google/tests/system/google/marketing_platform/example_bid_manager.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_bid_manager_wait_run_query_sensor]
+    :end-before: [END howto_google_bid_manager_wait_run_query_sensor]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.sensors.bid_manager.GoogleBidManagerRunQuerySensor`
+parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleBidManagerDownloadReportOperator:
+
+Downloading a report
+^^^^^^^^^^^^^^^^^^^^
+
+To download a report to GCS bucket use
+:class:`~airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerDownloadReportOperator`.
+
+.. exampleinclude:: 
/../../google/tests/system/google/marketing_platform/example_bid_manager.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_bid_manager_get_report_operator]
+    :end-before: [END howto_google_bid_manager_get_report_operator]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerDownloadReportOperator`
+parameters which allow you to dynamically determine values.
diff --git 
a/providers/google/docs/operators/marketing_platform/display_video.rst 
b/providers/google/docs/operators/marketing_platform/display_video.rst
index 04cc9acbd04..7ce014303ab 100644
--- a/providers/google/docs/operators/marketing_platform/display_video.rst
+++ b/providers/google/docs/operators/marketing_platform/display_video.rst
@@ -43,7 +43,6 @@ Use :ref:`Jinja templating <concepts:jinja-templating>` with
 
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`
 parameters which allow you to dynamically determine values.
 
-
 .. _howto/operator:GoogleDisplayVideo360SDFtoGCSOperator:
 
 Save SDF files in the Google Cloud Storage
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index c12c3e80d29..04541d5569d 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -464,6 +464,12 @@ integrations:
     how-to-guide:
       - /docs/apache-airflow-providers-google/operators/cloud/ray.rst
     tags: [gcp]
+  - integration-name: Google Bid Manager API
+    external-doc-url: https://developers.google.com/bid-manager
+    logo: /docs/integration-logos/Google-Search-Ads360.png
+    how-to-guide:
+      - 
/docs/apache-airflow-providers-google/operators/marketing_platform/bid_manager.rst
+    tags: [gmp]
 
 operators:
   - integration-name: Google Ads
@@ -629,6 +635,9 @@ operators:
   - integration-name: Google Ray
     python-modules:
       - airflow.providers.google.cloud.operators.ray
+  - integration-name: Google Bid Manager API
+    python-modules:
+      - airflow.providers.google.marketing_platform.operators.bid_manager
 
 sensors:
   - integration-name: Google BigQuery
@@ -694,6 +703,9 @@ sensors:
   - integration-name: Google Cloud Tasks
     python-modules:
       - airflow.providers.google.cloud.sensors.tasks
+  - integration-name: Google Bid Manager API
+    python-modules:
+      - airflow.providers.google.marketing_platform.sensors.bid_manager
 
 filesystems:
   - airflow.providers.google.cloud.fs.gcs
@@ -913,6 +925,9 @@ hooks:
   - integration-name: Google Ray
     python-modules:
       - airflow.providers.google.cloud.hooks.ray
+  - integration-name: Google Bid Manager API
+    python-modules:
+      - airflow.providers.google.marketing_platform.hooks.bid_manager
 
 bundles:
   - integration-name: Google Cloud Storage (GCS)
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index cab93aa57c9..fec76eb9d5f 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -473,6 +473,15 @@ def get_provider_info():
                 "how-to-guide": 
["/docs/apache-airflow-providers-google/operators/cloud/ray.rst"],
                 "tags": ["gcp"],
             },
+            {
+                "integration-name": "Google Bid Manager API",
+                "external-doc-url": 
"https://developers.google.com/bid-manager";,
+                "logo": "/docs/integration-logos/Google-Search-Ads360.png",
+                "how-to-guide": [
+                    
"/docs/apache-airflow-providers-google/operators/marketing_platform/bid_manager.rst"
+                ],
+                "tags": ["gmp"],
+            },
         ],
         "operators": [
             {
@@ -694,6 +703,10 @@ def get_provider_info():
                 "integration-name": "Google Ray",
                 "python-modules": 
["airflow.providers.google.cloud.operators.ray"],
             },
+            {
+                "integration-name": "Google Bid Manager API",
+                "python-modules": 
["airflow.providers.google.marketing_platform.operators.bid_manager"],
+            },
         ],
         "sensors": [
             {
@@ -780,6 +793,10 @@ def get_provider_info():
                 "integration-name": "Google Cloud Tasks",
                 "python-modules": 
["airflow.providers.google.cloud.sensors.tasks"],
             },
+            {
+                "integration-name": "Google Bid Manager API",
+                "python-modules": 
["airflow.providers.google.marketing_platform.sensors.bid_manager"],
+            },
         ],
         "filesystems": ["airflow.providers.google.cloud.fs.gcs"],
         "asset-uris": [
@@ -1062,6 +1079,10 @@ def get_provider_info():
                 "integration-name": "Google Ray",
                 "python-modules": ["airflow.providers.google.cloud.hooks.ray"],
             },
+            {
+                "integration-name": "Google Bid Manager API",
+                "python-modules": 
["airflow.providers.google.marketing_platform.hooks.bid_manager"],
+            },
         ],
         "bundles": [
             {
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/hooks/bid_manager.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/hooks/bid_manager.py
new file mode 100644
index 00000000000..4722a7720ae
--- /dev/null
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/hooks/bid_manager.py
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Bid Manager API hook."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import Any
+
+from googleapiclient.discovery import Resource, build
+
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class GoogleBidManagerHook(GoogleBaseHook):
+    """Hook for Google Bid Manager API."""
+
+    _conn: Resource | None = None
+
+    def __init__(
+        self,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            impersonation_chain=impersonation_chain,
+            **kwargs,
+        )
+        self.api_version = api_version
+
+    def get_conn(self) -> Resource:
+        """Retrieve connection to Bid Manager API."""
+        if not self._conn:
+            http_authorized = self._authorize()
+            self._conn = build(
+                "doubleclickbidmanager",
+                self.api_version,
+                http=http_authorized,
+                cache_discovery=False,
+            )
+        return self._conn
+
+    def create_query(self, query: dict[str, Any]) -> dict:
+        """
+        Create a query.
+
+        :param query: Query object to be passed to request body.
+        """
+        response = 
self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
+        return response
+
+    def delete_query(self, query_id: str) -> None:
+        """
+        Delete a stored query as well as the associated stored reports.
+
+        :param query_id: Query ID to delete.
+        """
+        
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)
+
+    def get_query(self, query_id: str) -> dict:
+        """
+        Retrieve a stored query.
+
+        :param query_id: Query ID to retrieve.
+        """
+        response = 
self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
+        return response
+
+    def list_queries(self) -> list[dict]:
+        """Retrieve stored queries."""
+        response = 
self.get_conn().queries().list().execute(num_retries=self.num_retries)
+        return response.get("queries", [])
+
+    def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
+        """
+        Run a stored query to generate a report.
+
+        :param query_id: Query ID to run.
+        :param params: Parameters for the report.
+        """
+        return (
+            self.get_conn().queries().run(queryId=query_id, 
body=params).execute(num_retries=self.num_retries)
+        )
+
+    def get_report(self, query_id: str, report_id: str) -> dict:
+        """
+        Retrieve a report.
+
+        :param query_id: Query ID for which report was generated.
+        :param report_id: Report ID to retrieve.
+        """
+        return (
+            self.get_conn()
+            .queries()
+            .reports()
+            .get(queryId=query_id, reportId=report_id)
+            .execute(num_retries=self.num_retries)
+        )
+
+    def list_reports(self, query_id: str) -> dict:
+        """
+        Retrieve a list of reports.
+
+        :param query_id: Query ID for which report was generated.
+        """
+        return (
+            
self.get_conn().queries().reports().list(queryId=query_id).execute(num_retries=self.num_retries)
+        )
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/operators/bid_manager.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/operators/bid_manager.py
new file mode 100644
index 00000000000..2f4a43ea4b0
--- /dev/null
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/operators/bid_manager.py
@@ -0,0 +1,347 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains operators for Bid Manager API part of the Google 
Display & Video 360."""
+
+from __future__ import annotations
+
+import json
+import shutil
+import tempfile
+import urllib.request
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlsplit
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.google.marketing_platform.hooks.bid_manager import 
GoogleBidManagerHook
+from airflow.providers.google.version_compat import BaseOperator
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Context
+
+
+class GoogleBidManagerCreateQueryOperator(BaseOperator):
+    """
+    Creates a query.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleBidManagerCreateQueryOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/create`
+
+    :param body: Report object passed to the request's body as described here:
+        https://developers.google.com/bid-manager/v2/queries#Query
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with the first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "body",
+        "impersonation_chain",
+    )
+    template_ext: Sequence[str] = (".json",)
+
+    def __init__(
+        self,
+        *,
+        body: dict[str, Any],
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.body = body
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def prepare_template(self) -> None:
+        # If .json is passed then we have to read the file
+        if isinstance(self.body, str) and self.body.endswith(".json"):
+            with open(self.body) as file:
+                self.body = json.load(file)
+
+    def execute(self, context: Context) -> dict:
+        hook = GoogleBidManagerHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating Bid Manager API query.")
+        response = hook.create_query(query=self.body)
+        query_id = response["queryId"]
+        context["task_instance"].xcom_push(key="query_id", value=query_id)
+        self.log.info("Created query with ID: %s", query_id)
+        return response
+
+
+class GoogleBidManagerRunQueryOperator(BaseOperator):
+    """
+    Runs a stored query to generate a report.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleBidManagerRunQueryOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/run`
+
+    :param query_id: Query ID to run.
+    :param parameters: Parameters for running a report as described here:
+        https://developers.google.com/bid-manager/v2/queries/run
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "query_id",
+        "parameters",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        query_id: str,
+        parameters: dict[str, Any] | None = None,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_id = query_id
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.parameters = parameters
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> dict:
+        hook = GoogleBidManagerHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Running query %s with the following parameters:\n %s",
+            self.query_id,
+            self.parameters,
+        )
+        response = hook.run_query(query_id=self.query_id, 
params=self.parameters)
+        context["task_instance"].xcom_push(key="query_id", 
value=response["key"]["queryId"])
+        context["task_instance"].xcom_push(key="report_id", 
value=response["key"]["reportId"])
+        return response
+
+
+class GoogleBidManagerDeleteQueryOperator(BaseOperator):
+    """
+    Deletes a stored query as well as the associated stored reports.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleBidManagerDeleteQueryOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/delete`
+
+    :param query_id: Query ID to delete.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "query_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        query_id: str,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.query_id = query_id
+
+    def execute(self, context: Context) -> None:
+        hook = GoogleBidManagerHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Deleting query with id: %s and all connected reports", 
self.query_id)
+        hook.delete_query(query_id=self.query_id)
+        self.log.info("Report deleted.")
+
+
+class GoogleBidManagerDownloadReportOperator(BaseOperator):
+    """
+    Retrieves a stored query.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleBidManagerDownloadReportOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/get`
+
+    :param report_id: Report ID to retrieve.
+    :param query_id: Query ID for which report was generated..
+    :param bucket_name: The bucket to upload to.
+    :param report_name: The report name to set when uploading the local file.
+    :param chunk_size: File will be downloaded in chunks of this many bytes.
+    :param gzip: Option to compress local file or file data for upload
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "query_id",
+        "report_id",
+        "bucket_name",
+        "report_name",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        query_id: str,
+        report_id: str,
+        bucket_name: str,
+        report_name: str | None = None,
+        gzip: bool = True,
+        chunk_size: int = 10 * 1024 * 1024,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_id = query_id
+        self.report_id = report_id
+        self.chunk_size = chunk_size
+        self.gzip = gzip
+        self.bucket_name = bucket_name
+        self.report_name = report_name
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def _resolve_file_name(self, name: str) -> str:
+        new_name = name if name.endswith(".csv") else f"{name}.csv"
+        new_name = f"{new_name}.gz" if self.gzip else new_name
+        return new_name
+
+    @staticmethod
+    def _set_bucket_name(name: str) -> str:
+        bucket = name if not name.startswith("gs://") else name[5:]
+        return bucket.strip("/")
+
+    def execute(self, context: Context):
+        hook = GoogleBidManagerHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        gcs_hook = GCSHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        resource = hook.get_report(query_id=self.query_id, 
report_id=self.report_id)
+        status = resource.get("metadata", {}).get("status", {}).get("state")
+        if resource and status not in ["DONE", "FAILED"]:
+            raise AirflowException(f"Report {self.report_id} for query 
{self.query_id} is still running")
+
+        # If no custom report_name provided, use Bid Manager name
+        file_url = resource["metadata"]["googleCloudStoragePath"]
+        if urllib.parse.urlparse(file_url).scheme == "file":
+            raise AirflowException("Accessing local file is not allowed in 
this operator")
+        report_name = self.report_name or 
urlsplit(file_url).path.split("/")[-1]
+        report_name = self._resolve_file_name(report_name)
+
+        # Download the report
+        self.log.info("Starting downloading report %s", self.report_id)
+        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
+            with urllib.request.urlopen(file_url) as response:  # nosec
+                shutil.copyfileobj(response, temp_file, length=self.chunk_size)
+
+            temp_file.flush()
+            # Upload the local file to bucket
+            bucket_name = self._set_bucket_name(self.bucket_name)
+            gcs_hook.upload(
+                bucket_name=bucket_name,
+                object_name=report_name,
+                gzip=self.gzip,
+                filename=temp_file.name,
+                mime_type="text/csv",
+            )
+        self.log.info(
+            "Report %s was saved in bucket %s as %s.",
+            self.report_id,
+            self.bucket_name,
+            report_name,
+        )
+        context["task_instance"].xcom_push(key="report_name", 
value=report_name)
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/bid_manager.py
similarity index 67%
copy from 
providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
copy to 
providers/google/src/airflow/providers/google/marketing_platform/sensors/bid_manager.py
index 433262c7698..75c806c6b62 100644
--- 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/bid_manager.py
@@ -14,30 +14,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Sensor for detecting the completion of DV360 reports."""
+"""Sensor for detecting the completion of DV360 Bid Manager reports."""
 
 from __future__ import annotations
 
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.sdk import AirflowException, 
BaseSensorOperator
-from airflow.providers.google.marketing_platform.hooks.display_video import 
GoogleDisplayVideo360Hook
+from airflow.providers.common.compat.sdk import BaseSensorOperator
+from airflow.providers.google.marketing_platform.hooks.bid_manager import 
GoogleBidManagerHook
 
 if TYPE_CHECKING:
     from airflow.providers.common.compat.sdk import Context
 
 
-class GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
+class GoogleBidManagerRunQuerySensor(BaseSensorOperator):
     """
-    Sensor for detecting the completion of SDF operation.
+    Sensor for detecting the completion of DV360 Bid Manager reports for API 
v2.
 
     .. seealso::
         For more information on how to use this operator, take a look at the 
guide:
-        
:ref:`howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor`
+        :ref:`howto/operator:GoogleBidManagerRunQuerySensor`
 
-    :param operation_name: The name of the operation resource
-    :param api_version: The version of the api that will be requested for 
example 'v1'.
+    :param query_id: Query ID for which report was generated
+    :param report_id: Report ID for which you want to wait
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
     :param gcp_conn_id: The connection ID to use when fetching connection info.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
         credentials, or chained list of accounts required to get the 
access_token
@@ -47,43 +48,41 @@ class 
GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
-
     """
 
     template_fields: Sequence[str] = (
-        "operation_name",
+        "query_id",
+        "report_id",
         "impersonation_chain",
     )
 
     def __init__(
         self,
-        operation_name: str,
-        api_version: str = "v4",
+        *,
+        query_id: str,
+        report_id: str,
+        api_version: str = "v2",
         gcp_conn_id: str = "google_cloud_default",
-        mode: str = "reschedule",
-        poke_interval: int = 60 * 5,
         impersonation_chain: str | Sequence[str] | None = None,
-        *args,
         **kwargs,
     ) -> None:
-        super().__init__(*args, **kwargs)
-        self.mode = mode
-        self.poke_interval = poke_interval
-        self.operation_name = operation_name
+        super().__init__(**kwargs)
+        self.query_id = query_id
+        self.report_id = report_id
         self.api_version = api_version
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
     def poke(self, context: Context) -> bool:
-        hook = GoogleDisplayVideo360Hook(
+        hook = GoogleBidManagerHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
             impersonation_chain=self.impersonation_chain,
         )
-        operation = 
hook.get_sdf_download_operation(operation_name=self.operation_name)
-        if "error" in operation:
-            message = f"The operation finished in error with 
{operation['error']}"
-            raise AirflowException(message)
-        if operation and operation.get("done"):
+
+        response = hook.get_report(query_id=self.query_id, 
report_id=self.report_id)
+        status = response.get("metadata", {}).get("status", {}).get("state")
+        self.log.info("STATUS OF THE REPORT %s FOR QUERY %s: %s", 
self.report_id, self.query_id, status)
+        if response and status in ["DONE", "FAILED"]:
             return True
         return False
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
index 433262c7698..83ab3df4f99 100644
--- 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
@@ -14,7 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Sensor for detecting the completion of DV360 reports."""
+"""Sensor for detecting the completion of DV360 SDF operations."""
 
 from __future__ import annotations
 
diff --git 
a/providers/google/tests/system/google/marketing_platform/example_bid_manager.py
 
b/providers/google/tests/system/google/marketing_platform/example_bid_manager.py
new file mode 100644
index 00000000000..9315258fa12
--- /dev/null
+++ 
b/providers/google/tests/system/google/marketing_platform/example_bid_manager.py
@@ -0,0 +1,214 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use Bid Manager API from 
GoogleDisplayVideo360.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+from datetime import datetime
+from typing import cast
+
+from airflow.models.dag import DAG
+from airflow.models.xcom_arg import XComArg
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.marketing_platform.operators.bid_manager import (
+    GoogleBidManagerCreateQueryOperator,
+    GoogleBidManagerDeleteQueryOperator,
+    GoogleBidManagerDownloadReportOperator,
+    GoogleBidManagerRunQueryOperator,
+)
+from airflow.providers.google.marketing_platform.sensors.bid_manager import (
+    GoogleBidManagerRunQuerySensor,
+)
+
+try:
+    from airflow.sdk import TriggerRule
+except ImportError:
+    # Compatibility for Airflow < 3.1
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+from google.cloud.exceptions import NotFound
+
+try:
+    from airflow.sdk import task
+except ImportError:
+    # Airflow 2 path
+    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+from airflow.providers.google.cloud.hooks.secret_manager import (
+    GoogleCloudSecretManagerHook,
+)
+
+from system.google.gcp_api_client_helpers import create_airflow_connection, 
delete_airflow_connection
+
+DAG_ID = "bid_manager"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+CONNECTION_TYPE = "google_cloud_platform"
+CONN_ID = "google_display_video_default"
+DISPLAY_VIDEO_SERVICE_ACCOUNT_KEY = "google_display_video_service_account_key"
+IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
+
+
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", "1234567")
+
+REPORT = {
+    "metadata": {
+        "title": "Airflow Test Report",
+        "dataRange": {"range": "LAST_7_DAYS"},
+        "format": "CSV",
+        "sendNotification": False,
+    },
+    "params": {
+        "type": "STANDARD",
+        "groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
+        "filters": [{"type": "FILTER_PARTNER", "value": ADVERTISER_ID}],
+        "metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
+    },
+    "schedule": {"frequency": "ONE_TIME"},
+}
+
+PARAMETERS = {
+    "dataRange": {"range": "LAST_7_DAYS"},
+}
+
+
+def get_secret(secret_id: str) -> str:
+    hook = GoogleCloudSecretManagerHook()
+    if hook.secret_exists(secret_id=secret_id):
+        return hook.access_secret(secret_id=secret_id).payload.data.decode()
+    raise NotFound(f"The secret {secret_id} not found")
+
+
+with DAG(
+    DAG_ID,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "bid_manager"],
+    schedule="@once",
+) as dag:
+
+    @task
+    def get_display_video_service_account_key():
+        return get_secret(secret_id=DISPLAY_VIDEO_SERVICE_ACCOUNT_KEY)
+
+    get_display_video_service_account_key_task = 
get_display_video_service_account_key()
+
+    @task
+    def create_connection_display_video(connection_id: str, key) -> None:
+        conn_extra_json = json.dumps(
+            {
+                "keyfile_dict": key,
+                "project": PROJECT_ID,
+                "scope": "https://www.googleapis.com/auth/display-video, 
https://www.googleapis.com/auth/cloud-platform, 
https://www.googleapis.com/auth/doubleclickbidmanager";,
+            }
+        )
+        create_airflow_connection(
+            connection_id=connection_id,
+            connection_conf={"conn_type": CONNECTION_TYPE, "extra": 
conn_extra_json},
+            is_composer=IS_COMPOSER,
+        )
+
+    create_connection_display_video_task = create_connection_display_video(
+        connection_id=CONN_ID, key=get_display_video_service_account_key_task
+    )
+
+    @task(task_id="delete_connection_task")
+    def delete_connection_display_video(connection_id: str) -> None:
+        delete_airflow_connection(connection_id=connection_id, 
is_composer=IS_COMPOSER)
+
+    delete_connection_task = 
delete_connection_display_video(connection_id=CONN_ID)
+
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, 
project_id=PROJECT_ID, gcp_conn_id=CONN_ID
+    )
+    # [START howto_google_bid_manager_create_query_operator]
+    create_query = GoogleBidManagerCreateQueryOperator(
+        body=REPORT, task_id="create_query", gcp_conn_id=CONN_ID
+    )
+
+    query_id = cast("str", XComArg(create_query, key="query_id"))
+    # [END howto_google_bid_manager_create_query_operator]
+
+    # [START howto_google_bid_manager_run_query_report_operator]
+    run_query = GoogleBidManagerRunQueryOperator(
+        query_id=query_id, parameters=PARAMETERS, task_id="run_report", 
gcp_conn_id=CONN_ID
+    )
+
+    query_id = cast("str", XComArg(run_query, key="query_id"))
+    report_id = cast("str", XComArg(run_query, key="report_id"))
+    # [END howto_google_bid_manager_run_query_report_operator]
+
+    # [START howto_google_bid_manager_wait_run_query_sensor]
+    wait_for_query = GoogleBidManagerRunQuerySensor(
+        task_id="wait_for_query",
+        query_id=query_id,
+        report_id=report_id,
+        gcp_conn_id=CONN_ID,
+    )
+    # [END howto_google_bid_manager_wait_run_query_sensor]
+
+    # [START howto_google_bid_manager_get_report_operator]
+    get_report = GoogleBidManagerDownloadReportOperator(
+        query_id=query_id,
+        report_id=report_id,
+        task_id="get_report",
+        bucket_name=BUCKET_NAME,
+        report_name="test1.csv",
+        gcp_conn_id=CONN_ID,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=BUCKET_NAME,
+        gcp_conn_id=CONN_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_google_bid_manager_get_report_operator]
+
+    # [START howto_google_bid_manager_delete_query_operator]
+    delete_query = GoogleBidManagerDeleteQueryOperator(
+        query_id=query_id, task_id="delete_query", 
trigger_rule=TriggerRule.ALL_DONE, gcp_conn_id=CONN_ID
+    )
+    # [END howto_google_bid_manager_delete_query_operator]
+
+    (
+        get_display_video_service_account_key_task
+        >> create_connection_display_video_task  # type: ignore
+        >> create_bucket
+        >> create_query
+        >> run_query
+        >> wait_for_query
+        >> get_report
+        >> delete_query
+        >> delete_bucket
+        >> delete_connection_task
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/providers/google/tests/unit/google/marketing_platform/hooks/test_bid_manager.py
 
b/providers/google/tests/unit/google/marketing_platform/hooks/test_bid_manager.py
new file mode 100644
index 00000000000..d3cd91419aa
--- /dev/null
+++ 
b/providers/google/tests/unit/google/marketing_platform/hooks/test_bid_manager.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.google.marketing_platform.hooks.bid_manager import 
GoogleBidManagerHook
+
+from unit.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+API_VERSION = "v2"
+GCP_CONN_ID = "google_cloud_default"
+
+
+class TestGoogleBidManagerHook:
+    def setup_method(self):
+        with mock.patch(
+            
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.hook = GoogleBidManagerHook(api_version=API_VERSION, 
gcp_conn_id=GCP_CONN_ID)
+
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook._authorize"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.build")
+    def test_gen_conn(self, mock_build, mock_authorize):
+        result = self.hook.get_conn()
+        mock_build.assert_called_once_with(
+            "doubleclickbidmanager",
+            API_VERSION,
+            http=mock_authorize.return_value,
+            cache_discovery=False,
+        )
+        assert mock_build.return_value == result
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_create_query(self, get_conn_mock):
+        body = {"body": "test"}
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.create.return_value.execute.return_value
 = (
+            return_value
+        )
+        result = self.hook.create_query(query=body)
+
+        
get_conn_mock.return_value.queries.return_value.create.assert_called_once_with(body=body)
+
+        assert return_value == result
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_delete_query(self, get_conn_mock):
+        query_id = "QUERY_ID"
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.delete.return_value.execute.return_value
 = (
+            return_value
+        )
+        self.hook.delete_query(query_id=query_id)
+
+        
get_conn_mock.return_value.queries.return_value.delete.assert_called_once_with(queryId=query_id)
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_get_query(self, get_conn_mock):
+        query_id = "QUERY_ID"
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.get.return_value.execute.return_value
 = return_value
+        result = self.hook.get_query(query_id=query_id)
+
+        
get_conn_mock.return_value.queries.return_value.get.assert_called_once_with(queryId=query_id)
+
+        assert return_value == result
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_list_queries(self, get_conn_mock):
+        queries = ["test"]
+        return_value = {"queries": queries}
+        
get_conn_mock.return_value.queries.return_value.list.return_value.execute.return_value
 = return_value
+        result = self.hook.list_queries()
+
+        
get_conn_mock.return_value.queries.return_value.list.assert_called_once_with()
+
+        assert queries == result
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_run_query(self, get_conn_mock):
+        query_id = "QUERY_ID"
+        params = {"params": "test"}
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.run.return_value.execute.return_value
 = return_value
+
+        result = self.hook.run_query(query_id=query_id, params=params)
+
+        
get_conn_mock.return_value.queries.return_value.run.assert_called_once_with(
+            queryId=query_id, body=params
+        )
+        assert return_value == result
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_get_report(self, get_conn_mock):
+        query_id = "QUERY_ID"
+        report_id = "REPORT_ID"
+        return_value = "TEST_REPORT"
+        (
+            
get_conn_mock.return_value.queries.return_value.reports.return_value.get.return_value.execute.return_value
+        ) = return_value
+
+        result = self.hook.get_report(query_id=query_id, report_id=report_id)
+
+        
get_conn_mock.return_value.queries.return_value.reports.return_value.get.assert_called_once_with(
+            queryId=query_id, reportId=report_id
+        )
+        assert return_value == result
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.bid_manager.GoogleBidManagerHook.get_conn")
+    def test_list_reports(self, get_conn_mock):
+        query_id = "QUERY_ID"
+        reports = ["report1", "report2"]
+        return_value = {"reports": reports}
+        (
+            
get_conn_mock.return_value.queries.return_value.reports.return_value.list.return_value.execute.return_value
+        ) = return_value
+
+        result = self.hook.list_reports(query_id=query_id)
+
+        
get_conn_mock.return_value.queries.return_value.reports.return_value.list.assert_called_once_with(
+            queryId=query_id
+        )
+        assert reports == result["reports"]
diff --git 
a/providers/google/tests/unit/google/marketing_platform/operators/test_bid_manager.py
 
b/providers/google/tests/unit/google/marketing_platform/operators/test_bid_manager.py
new file mode 100644
index 00000000000..20c323e57c0
--- /dev/null
+++ 
b/providers/google/tests/unit/google/marketing_platform/operators/test_bid_manager.py
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from tempfile import NamedTemporaryFile
+from unittest import mock
+
+import pytest
+from sqlalchemy import delete
+
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance as TI
+from airflow.providers.google.marketing_platform.operators.bid_manager import (
+    GoogleBidManagerCreateQueryOperator,
+    GoogleBidManagerDeleteQueryOperator,
+    GoogleBidManagerDownloadReportOperator,
+    GoogleBidManagerRunQueryOperator,
+)
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+API_VERSION = "v2"
+GCP_CONN_ID = "google_cloud_default"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+REPORT_ID = "report_id"
+BUCKET_NAME = "test_bucket"
+REPORT_NAME = "test_report.csv"
+QUERY_ID = FILENAME = "test.csv"
+
+
+class TestGoogleBidManagerDeleteQueryOperator:
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerHook")
+    def test_execute(self, hook_mock):
+        op = GoogleBidManagerDeleteQueryOperator(
+            query_id=QUERY_ID, api_version=API_VERSION, task_id="test_task"
+        )
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=None,
+        )
+        
hook_mock.return_value.delete_query.assert_called_once_with(query_id=QUERY_ID)
+
+
[email protected]_test
+class TestGoogleBidManagerDownloadReportOperator:
+    def setup_method(self):
+        with create_session() as session:
+            session.execute(delete(TI))
+
+    def teardown_method(self):
+        with create_session() as session:
+            session.execute(delete(TI))
+
+    @pytest.mark.parametrize(
+        ("file_path", "should_except"), [("https://host/path";, False), 
("file:/path/to/file", True)]
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.shutil")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.urllib.request")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.tempfile")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GCSHook")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerHook")
+    def test_execute(
+        self,
+        mock_hook,
+        mock_gcs_hook,
+        mock_temp,
+        mock_request,
+        mock_shutil,
+        file_path,
+        should_except,
+    ):
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name 
= FILENAME
+        mock_hook.return_value.get_report.return_value = {
+            "metadata": {
+                "status": {"state": "DONE"},
+                "googleCloudStoragePath": file_path,
+            }
+        }
+        # Create mock context with task_instance
+        mock_context = {"task_instance": mock.Mock()}
+
+        op = GoogleBidManagerDownloadReportOperator(
+            query_id=QUERY_ID,
+            report_id=REPORT_ID,
+            bucket_name=BUCKET_NAME,
+            report_name=REPORT_NAME,
+            task_id="test_task",
+        )
+        if should_except:
+            with pytest.raises(AirflowException):
+                op.execute(context=mock_context)
+            return
+        op.execute(context=mock_context)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version="v2",
+            impersonation_chain=None,
+        )
+        
mock_hook.return_value.get_report.assert_called_once_with(report_id=REPORT_ID, 
query_id=QUERY_ID)
+
+        mock_gcs_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=None,
+        )
+        mock_gcs_hook.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
+            filename=FILENAME,
+            gzip=True,
+            mime_type="text/csv",
+            object_name=REPORT_NAME + ".gz",
+        )
+        mock_context["task_instance"].xcom_push.assert_called_once_with(
+            key="report_name", value=REPORT_NAME + ".gz"
+        )
+
+    @pytest.mark.parametrize(
+        "test_bucket_name",
+        [BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ 
ti.xcom_pull(task_ids='taskflow_op') }}"],
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.shutil")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.urllib.request")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.tempfile")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GCSHook")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerHook")
+    def test_set_bucket_name(
+        self,
+        mock_hook,
+        mock_gcs_hook,
+        mock_temp,
+        mock_request,
+        mock_shutil,
+        test_bucket_name,
+        dag_maker,
+    ):
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name 
= FILENAME
+        mock_hook.return_value.get_report.return_value = {
+            "metadata": {"status": {"state": "DONE"}, 
"googleCloudStoragePath": "TEST"}
+        }
+        with dag_maker(dag_id="test_set_bucket_name", start_date=DEFAULT_DATE) 
as dag:
+            if BUCKET_NAME not in test_bucket_name:
+
+                @dag.task(task_id="taskflow_op")
+                def f():
+                    return BUCKET_NAME
+
+                taskflow_op = f()
+
+            op = GoogleBidManagerDownloadReportOperator(
+                query_id=QUERY_ID,
+                report_id=REPORT_ID,
+                bucket_name=test_bucket_name if test_bucket_name != "XComArg" 
else taskflow_op,
+                report_name=REPORT_NAME,
+                task_id="test_task",
+            )
+
+            if test_bucket_name == "{{ ti.xcom_pull(task_ids='taskflow_op') 
}}":
+                taskflow_op >> op
+
+        if AIRFLOW_V_3_0_PLUS:
+            dag.test()
+        else:
+            dr = dag_maker.create_dagrun()
+            for ti in dr.get_task_instances():
+                ti.run()
+
+        mock_gcs_hook.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
+            filename=FILENAME,
+            gzip=True,
+            mime_type="text/csv",
+            object_name=REPORT_NAME + ".gz",
+        )
+
+
+class TestGoogleBidManagerRunQueryOperator:
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerHook")
+    def test_execute(self, hook_mock):
+        parameters = {"param": "test"}
+
+        # Create mock context with task_instance
+        mock_context = {"task_instance": mock.Mock()}
+
+        hook_mock.return_value.run_query.return_value = {
+            "key": {
+                "queryId": QUERY_ID,
+                "reportId": REPORT_ID,
+            }
+        }
+        op = GoogleBidManagerRunQueryOperator(
+            query_id=QUERY_ID,
+            parameters=parameters,
+            api_version=API_VERSION,
+            task_id="test_task",
+        )
+        op.execute(context=mock_context)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=None,
+        )
+
+        
mock_context["task_instance"].xcom_push.assert_any_call(key="query_id", 
value=QUERY_ID)
+        
mock_context["task_instance"].xcom_push.assert_any_call(key="report_id", 
value=REPORT_ID)
+        
hook_mock.return_value.run_query.assert_called_once_with(query_id=QUERY_ID, 
params=parameters)
+
+
+class TestGoogleBidManagerCreateQueryOperator:
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.bid_manager.GoogleBidManagerHook")
+    def test_execute(self, hook_mock):
+        body = {"body": "test"}
+
+        # Create mock context with task_instance
+        mock_context = {"task_instance": mock.Mock()}
+
+        hook_mock.return_value.create_query.return_value = {"queryId": 
QUERY_ID}
+        op = GoogleBidManagerCreateQueryOperator(body=body, 
task_id="test_task")
+        op.execute(context=mock_context)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version="v2",
+            impersonation_chain=None,
+        )
+        hook_mock.return_value.create_query.assert_called_once_with(query=body)
+        
mock_context["task_instance"].xcom_push.assert_called_once_with(key="query_id", 
value=QUERY_ID)
+
+    def test_prepare_template(self):
+        body = {"key": "value"}
+        with NamedTemporaryFile("w+", suffix=".json") as f:
+            f.write(json.dumps(body))
+            f.flush()
+            op = GoogleBidManagerCreateQueryOperator(body=body, 
task_id="test_task")
+            op.prepare_template()
+
+            assert isinstance(op.body, dict)
+            assert op.body == body
diff --git 
a/providers/google/tests/unit/google/marketing_platform/sensors/test_bid_manager.py
 
b/providers/google/tests/unit/google/marketing_platform/sensors/test_bid_manager.py
new file mode 100644
index 00000000000..f48810e60de
--- /dev/null
+++ 
b/providers/google/tests/unit/google/marketing_platform/sensors/test_bid_manager.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.google.marketing_platform.sensors.bid_manager import (
+    GoogleBidManagerRunQuerySensor,
+)
+
+MODULE_NAME = "airflow.providers.google.marketing_platform.sensors.bid_manager"
+
+GCP_CONN_ID = "google_cloud_default"
+
+
+class TestGoogleBidManagerRunQuerySensor:
+    @mock.patch(f"{MODULE_NAME}.GoogleBidManagerHook")
+    @mock.patch(f"{MODULE_NAME}.BaseSensorOperator")
+    def test_poke(self, mock_base_op, hook_mock):
+        query_id = "QUERY_ID"
+        report_id = "REPORT_ID"
+        op = GoogleBidManagerRunQuerySensor(query_id=query_id, 
report_id=report_id, task_id="test_task")
+        op.poke(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version="v2",
+            impersonation_chain=None,
+        )
+        
hook_mock.return_value.get_report.assert_called_once_with(query_id=query_id, 
report_id=report_id)

Reply via email to