This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 57141585e86 Add BigQuery routine operators and existence sensor
(#65499)
57141585e86 is described below
commit 57141585e86f8e9dc43b696fa5a2e8439fe3931e
Author: Ashir Alam <[email protected]>
AuthorDate: Tue May 12 09:39:45 2026 -0400
Add BigQuery routine operators and existence sensor (#65499)
---
providers/google/docs/operators/cloud/bigquery.rst | 8 +
.../docs/operators/cloud/bigquery_routines.rst | 164 +++++++
providers/google/provider.yaml | 1 +
.../providers/google/cloud/hooks/bigquery.py | 252 +++++++++++
.../providers/google/cloud/operators/bigquery.py | 492 +++++++++++++++++++++
.../providers/google/cloud/sensors/bigquery.py | 63 +++
.../airflow/providers/google/get_provider_info.py | 5 +-
.../cloud/bigquery/example_bigquery_routines.py | 219 +++++++++
.../tests/unit/google/cloud/hooks/test_bigquery.py | 172 +++++++
.../unit/google/cloud/operators/test_bigquery.py | 183 ++++++++
.../unit/google/cloud/sensors/test_bigquery.py | 39 ++
11 files changed, 1597 insertions(+), 1 deletion(-)
diff --git a/providers/google/docs/operators/cloud/bigquery.rst
b/providers/google/docs/operators/cloud/bigquery.rst
index 41c49dd5e03..82f43c319dc 100644
--- a/providers/google/docs/operators/cloud/bigquery.rst
+++ b/providers/google/docs/operators/cloud/bigquery.rst
@@ -289,6 +289,14 @@ You can also use this operator to delete a materialized
view.
:start-after: [START howto_operator_bigquery_delete_materialized_view]
:end-before: [END howto_operator_bigquery_delete_materialized_view]
+Manage routines
+^^^^^^^^^^^^^^^
+
+Airflow exposes the BigQuery routines API (user-defined functions, stored
+procedures, and table-valued functions) through a small set of dedicated
+operators and a sensor. See :doc:`bigquery_routines` for the full guide with
+examples for each routine type.
+
.. _howto/operator:BigQueryInsertJobOperator:
Execute BigQuery jobs
diff --git a/providers/google/docs/operators/cloud/bigquery_routines.rst
b/providers/google/docs/operators/cloud/bigquery_routines.rst
new file mode 100644
index 00000000000..1cd317f438f
--- /dev/null
+++ b/providers/google/docs/operators/cloud/bigquery_routines.rst
@@ -0,0 +1,164 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Google Cloud BigQuery Routines Operators
+========================================
+
+`BigQuery routines <https://cloud.google.com/bigquery/docs/routines>`__ are
+dataset-scoped resources that encapsulate logic you can reuse from SQL:
+
+* **Scalar user-defined functions** (SQL or JavaScript)
+* **Stored procedures** (SQL or Apache Spark)
+* **Table-valued functions** (SQL)
+* **User-defined aggregate functions** (SQL)
+* **Remote functions** backed by Cloud Run / Cloud Functions
+
+Airflow exposes the BigQuery routines API so your DAG can own both the routine
+definitions and the pipeline that depends on them, instead of embedding
+``CREATE FUNCTION`` / ``CREATE PROCEDURE`` DDL in a query job.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:BigQueryCreateRoutineOperator:
+
+Create a routine
+^^^^^^^^^^^^^^^^
+
+Use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateRoutineOperator`
+to create any routine type. Routine fields mirror the BigQuery REST API's
+``Routine`` resource. Pass them individually as keyword arguments, or pass the
+complete resource via ``routine_resource``.
+
+The ``if_exists`` argument controls collision behavior:
+
+* ``"fail"`` (default) — raise when the routine already exists.
+* ``"skip"`` — leave the existing routine in place and return it.
+* ``"replace"`` — delete the existing routine, then create the new one.
+
+Scalar SQL UDF:
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_create_scalar_routine]
+ :end-before: [END howto_operator_bigquery_create_scalar_routine]
+
+Stored procedure:
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_create_procedure_routine]
+ :end-before: [END howto_operator_bigquery_create_procedure_routine]
+
+Table-valued function:
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_create_tvf_routine]
+ :end-before: [END howto_operator_bigquery_create_tvf_routine]
+
+.. _howto/operator:BigQueryUpdateRoutineOperator:
+
+Update a routine
+^^^^^^^^^^^^^^^^
+
+Use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateRoutineOperator`
+to patch selected fields of an existing routine. Only the fields listed in
+``fields`` are updated; any listed field that is unset in ``routine_resource``
+is cleared on the server.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_update_routine]
+ :end-before: [END howto_operator_bigquery_update_routine]
+
+.. _howto/operator:BigQueryGetRoutineOperator:
+
+Fetch a routine
+^^^^^^^^^^^^^^^
+
+Use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetRoutineOperator`
+to read a routine's metadata. The operator pushes the serialized resource to
+XCom.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_get_routine]
+ :end-before: [END howto_operator_bigquery_get_routine]
+
+.. _howto/operator:BigQueryListRoutinesOperator:
+
+List routines
+^^^^^^^^^^^^^
+
+Use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryListRoutinesOperator`
+to list all routines in a dataset. Only a subset of each routine's fields is
+returned; use ``BigQueryGetRoutineOperator`` for the complete resource.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_list_routines]
+ :end-before: [END howto_operator_bigquery_list_routines]
+
+.. _howto/operator:BigQueryDeleteRoutineOperator:
+
+Delete a routine
+^^^^^^^^^^^^^^^^
+
+Use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteRoutineOperator`
+to remove a routine. Set ``ignore_if_missing=True`` to make the delete a no-op
+when the routine does not exist.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_delete_routine]
+ :end-before: [END howto_operator_bigquery_delete_routine]
+
+.. _howto/sensor:BigQueryRoutineExistenceSensor:
+
+Wait for a routine
+^^^^^^^^^^^^^^^^^^
+
+Use
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryRoutineExistenceSensor`
+to block downstream tasks until a routine exists. This is useful when routine
+creation happens in a separate DAG or an external system.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_bigquery_routine_existence]
+ :end-before: [END howto_sensor_bigquery_routine_existence]
+
+Reference
+^^^^^^^^^
+
+For further information, look at:
+
+* `Google Cloud API Documentation
<https://cloud.google.com/bigquery/docs/reference/rest/v2/routines>`__
+* `User-defined functions
<https://cloud.google.com/bigquery/docs/user-defined-functions>`__
+* `Stored procedures <https://cloud.google.com/bigquery/docs/procedures>`__
+* `Table functions <https://cloud.google.com/bigquery/docs/table-functions>`__
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index aff958b5429..4eed2dd95e2 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -148,6 +148,7 @@ integrations:
- integration-name: Google BigQuery
how-to-guide:
- /docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
+ -
/docs/apache-airflow-providers-google/operators/cloud/bigquery_routines.rst
external-doc-url: https://cloud.google.com/bigquery/
logo: /docs/integration-logos/BigQuery.png
tags: [gcp]
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 8af8aecdb6a..1aa72d81935 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -47,6 +47,7 @@ from google.cloud.bigquery import (
)
from google.cloud.bigquery.dataset import AccessEntry, Dataset,
DatasetListItem, DatasetReference
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY
+from google.cloud.bigquery.routine import Routine, RoutineReference
from google.cloud.bigquery.table import (
Row,
RowIterator,
@@ -96,6 +97,21 @@ log = logging.getLogger(__name__)
BigQueryJob = CopyJob | QueryJob | LoadJob | ExtractJob
+_ROUTINE_WRITABLE_PROPERTIES: tuple[str, ...] = (
+ "type_",
+ "language",
+ "arguments",
+ "return_type",
+ "return_table_type",
+ "imported_libraries",
+ "body",
+ "description",
+ "determinism_level",
+ "remote_function_options",
+ "data_governance_type",
+ "external_runtime_options",
+)
+
class BigQueryHook(GoogleBaseHook, DbApiHook):
"""
@@ -1170,6 +1186,242 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
)
return table
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_routine(
+ self,
+ routine: Routine | dict[str, Any],
+ dataset_id: str | None = None,
+ routine_id: str | None = None,
+ project_id: str = PROVIDE_PROJECT_ID,
+ if_exists: str = "fail",
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ ) -> Routine:
+ """
+ Create a new routine (UDF, procedure, or TVF) in the dataset.
+
+ :param routine: The routine to create. Either a
+ :class:`~google.cloud.bigquery.routine.Routine` instance or a dict
in the format defined
+ at
https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#Routine. If
the
+ routine reference is incomplete, ``dataset_id`` and ``routine_id``
are used to complete
+ it.
+ :param dataset_id: Optional. The dataset that will own the routine.
Required if ``routine``
+ does not include a fully-qualified ``routineReference``.
+ :param routine_id: Optional. The routine identifier. Required if
``routine`` does not
+ include a fully-qualified ``routineReference``.
+ :param project_id: Optional. The project that owns the dataset. Falls
back to the hook
+ default.
+ :param if_exists: What to do if a routine with the same identifier
already exists:
+ ``"fail"`` (default) raises
:class:`google.api_core.exceptions.Conflict`;
+ ``"skip"`` leaves the existing routine untouched and returns it;
+ ``"replace"`` deletes the existing routine and creates the new one.
+ :param retry: Optional. A retry object used to retry requests.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request.
+ :return: The created (or existing) routine.
+ """
+ if if_exists not in {"fail", "skip", "replace"}:
+ raise ValueError(f"`if_exists` must be one of 'fail', 'skip',
'replace'; got {if_exists!r}")
+ routine = self._build_routine(
+ routine, project_id=project_id, dataset_id=dataset_id,
routine_id=routine_id
+ )
+ client = self.get_client(project_id=project_id)
+ ref = routine.reference
+ routine_path = f"{ref.project}.{ref.dataset_id}.{ref.routine_id}"
+
+ if if_exists == "replace":
+ try:
+ client.delete_routine(ref, retry=retry, timeout=timeout)
+ self.log.info("Deleted existing routine before replace: %s",
routine_path)
+ except NotFound:
+ pass
+ result = client.create_routine(routine, exists_ok=False,
retry=retry, timeout=timeout)
+ else:
+ result = client.create_routine(
+ routine, exists_ok=(if_exists == "skip"), retry=retry,
timeout=timeout
+ )
+
+ self.log.info("Created routine: %s", routine_path)
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_routine(
+ self,
+ routine: Routine | dict[str, Any],
+ fields: Sequence[str],
+ dataset_id: str | None = None,
+ routine_id: str | None = None,
+ project_id: str = PROVIDE_PROJECT_ID,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ ) -> Routine:
+ """
+ Update specified fields of an existing routine.
+
+ BigQuery's ``routines.update`` endpoint is a full-resource PUT (not a
PATCH), so
+ this method fetches the existing routine, merges in the requested
field changes,
+ and sends the complete resource back. A field listed in ``fields`` but
absent in
+ ``routine`` is cleared on the server.
+
+ :param routine: The routine providing new values for the listed
fields, either a
+ :class:`~google.cloud.bigquery.routine.Routine` or a Routine API
dict (keys in
+ camelCase, e.g. ``{"description": ..., "definitionBody": ...}``).
+ :param fields: The routine properties to update, given as Routine API
field names
+ (e.g. ``["description", "definitionBody"]``).
+ :param dataset_id: Optional. Used to complete the routine reference if
missing.
+ :param routine_id: Optional. Used to complete the routine reference if
missing.
+ :param project_id: Optional. The project that owns the dataset.
+ :param retry: Optional. A retry object used to retry requests.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request.
+ """
+ if not fields:
+ raise ValueError("`fields` must be a non-empty sequence of routine
properties to update.")
+
+ if isinstance(routine, Routine):
+ updates_repr = routine.to_api_repr()
+ else:
+ updates_repr = dict(routine)
+
+ ref_repr = dict(updates_repr.get("routineReference") or {})
+ ref_repr.setdefault("projectId", project_id)
+ if dataset_id:
+ ref_repr.setdefault("datasetId", dataset_id)
+ if routine_id:
+ ref_repr.setdefault("routineId", routine_id)
+
+ client = self.get_client(project_id=project_id)
+ existing =
client.get_routine(RoutineReference.from_api_repr(ref_repr), retry=retry,
timeout=timeout)
+ merged_repr = existing.to_api_repr()
+ for field in fields:
+ if field in updates_repr:
+ merged_repr[field] = updates_repr[field]
+ else:
+ merged_repr.pop(field, None)
+
+ merged = Routine.from_api_repr(merged_repr)
+ result = client.update_routine(
+ merged, list(_ROUTINE_WRITABLE_PROPERTIES), retry=retry,
timeout=timeout
+ )
+ out_ref = result.reference
+ self.log.info("Updated routine: %s.%s.%s", out_ref.project,
out_ref.dataset_id, out_ref.routine_id)
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_routine(
+ self,
+ dataset_id: str,
+ routine_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ not_found_ok: bool = True,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ ) -> None:
+ """
+ Delete an existing routine.
+
+ :param dataset_id: The dataset that owns the routine.
+ :param routine_id: The identifier of the routine to delete.
+ :param project_id: Optional. The project that owns the dataset.
+ :param not_found_ok: If ``True`` (default), ignore "not found" errors.
+ :param retry: Optional. A retry object used to retry requests.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request.
+ """
+ ref = RoutineReference.from_api_repr(
+ {"projectId": project_id, "datasetId": dataset_id, "routineId":
routine_id}
+ )
+ routine_path = f"{project_id}.{dataset_id}.{routine_id}"
+ try:
+ self.get_client(project_id=project_id).delete_routine(ref,
retry=retry, timeout=timeout)
+ except NotFound:
+ if not not_found_ok:
+ raise
+ self.log.info("Routine not found, ignoring: %s", routine_path)
+ return
+ self.log.info("Deleted routine: %s", routine_path)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_routine(
+ self,
+ dataset_id: str,
+ routine_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ ) -> Routine:
+ """
+ Retrieve the metadata for a routine.
+
+ :param dataset_id: The dataset that owns the routine.
+ :param routine_id: The identifier of the routine to fetch.
+ :param project_id: Optional. The project that owns the dataset.
+ :param retry: Optional. A retry object used to retry requests.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request.
+ """
+ ref = RoutineReference.from_api_repr(
+ {"projectId": project_id, "datasetId": dataset_id, "routineId":
routine_id}
+ )
+ return self.get_client(project_id=project_id).get_routine(ref,
retry=retry, timeout=timeout)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_routines(
+ self,
+ dataset_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ max_results: int | None = None,
+ page_token: str | None = None,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ ) -> list[Routine]:
+ """
+ List routines in a dataset.
+
+ :param dataset_id: The dataset to list routines for.
+ :param project_id: Optional. The project that owns the dataset.
+ :param max_results: Optional. The maximum number of routines to return.
+ :param page_token: Optional. An opaque token identifying the page of
results to return.
+ :param retry: Optional. A retry object used to retry requests.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request.
+ :return: The list of routines. Only a subset of fields is populated;
fetch individual
+ routines via :meth:`get_routine` for the complete resource.
+ """
+ dataset_ref = DatasetReference(project=project_id,
dataset_id=dataset_id)
+ iterator = self.get_client(project_id=project_id).list_routines(
+ dataset_ref,
+ max_results=max_results,
+ page_token=page_token,
+ retry=retry,
+ timeout=timeout,
+ )
+ return list(iterator)
+
+ @staticmethod
+ def _build_routine(
+ routine: Routine | dict[str, Any],
+ project_id: str,
+ dataset_id: str | None,
+ routine_id: str | None,
+ ) -> Routine:
+ """Return a :class:`Routine` with a fully-qualified reference, filling
gaps from kwargs."""
+ if isinstance(routine, Routine):
+ resource = routine.to_api_repr()
+ else:
+ resource = dict(routine)
+
+ ref = resource.setdefault("routineReference", {})
+ ref.setdefault("projectId", project_id)
+ if dataset_id:
+ ref.setdefault("datasetId", dataset_id)
+ if routine_id:
+ ref.setdefault("routineId", routine_id)
+
+ missing = [k for k in ("projectId", "datasetId", "routineId") if not
ref.get(k)]
+ if missing:
+ raise ValueError(
+ "Routine reference is missing required fields "
+ f"{missing!r}. Provide them via `dataset_id`/`routine_id` or
in the routine "
+ "reference itself."
+ )
+ return Routine.from_api_repr(resource)
+
@GoogleBaseHook.fallback_to_default_project_id
def poll_job_complete(
self,
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index 280ae79422b..120350d5b39 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -30,6 +30,7 @@ from typing import TYPE_CHECKING, Any, SupportsAbs
from google.api_core.exceptions import Conflict
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob,
QueryJob, Row
+from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.table import RowIterator, Table, TableListItem,
TableReference
from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -2570,3 +2571,494 @@ class
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOpera
)
else:
self.log.info("Skipping to cancel job: %s:%s.%s", self.project_id,
self.location, self.job_id)
+
+
+class BigQueryCreateRoutineOperator(GoogleCloudBaseOperator):
+ """
+ Create a BigQuery routine (UDF, stored procedure, table-valued function,
or aggregate).
+
+ The routine is defined by a ``Routine`` resource as documented at
+ https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#Routine.
The full resource
+ may be passed via ``routine_resource``, or individual fields can be passed
as keyword
+ arguments, which are merged into ``routine_resource`` at execute time.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BigQueryCreateRoutineOperator`
+
+ :param dataset_id: The dataset that will own the routine.
+ :param routine_id: The identifier of the routine to create.
+ :param routine_resource: The routine resource as a dict or
+ :class:`~google.cloud.bigquery.routine.Routine`. If omitted, the
resource is assembled
+ from the individual keyword arguments below.
+ :param project_id: Optional. The project that owns the dataset. Falls back
to the connection's
+ default.
+ :param routine_type: Optional. One of ``"SCALAR_FUNCTION"``,
``"PROCEDURE"``,
+ ``"TABLE_VALUED_FUNCTION"``, ``"AGGREGATE_FUNCTION"``.
+ :param language: Optional. ``"SQL"`` or ``"JAVASCRIPT"``.
+ :param definition_body: Optional. The body of the routine (SQL or
JavaScript).
+ :param arguments: Optional. Sequence of argument dicts in the Google API
representation
+ (see the ``Routine.Argument`` schema).
+ :param return_type: Optional. The return type of the routine as a
``StandardSqlDataType``
+ dict.
+ :param return_table_type: Optional. The return table type for table-valued
functions as a
+ ``StandardSqlTableType`` dict.
+ :param imported_libraries: Optional. URIs of Cloud Storage libraries to
import (JavaScript
+ UDFs only).
+ :param determinism_level: Optional. Determinism level for the routine.
+ :param security_mode: Optional. Security mode (``"DEFINER"`` or
``"INVOKER"``).
+ :param data_governance_type: Optional. Data governance type for the
routine.
+ :param description: Optional. Description of the routine.
+ :param remote_function_options: Optional. Options for remote functions.
+ :param spark_options: Optional. Options for Spark stored procedures.
+ :param if_exists: What to do if a routine with the same identifier already
exists.
+ ``"fail"`` (default) raises, ``"skip"`` returns the existing routine,
``"replace"``
+ deletes the existing routine and creates the new one.
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+ :param location: The location of the BigQuery dataset.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts.
+ :param retry: A retry object used to retry requests.
+ :param timeout: The amount of time, in seconds, to wait for the request.
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "routine_id",
+ "definition_body",
+ "arguments",
+ "routine_resource",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {"routine_resource": "json", "arguments":
"json"}
+ template_ext: Sequence[str] = (".sql",)
+ ui_color = BigQueryUIColors.TABLE.value
+
+ def __init__(
+ self,
+ *,
+ dataset_id: str,
+ routine_id: str,
+ routine_resource: dict[str, Any] | Routine | None = None,
+ project_id: str = PROVIDE_PROJECT_ID,
+ routine_type: str | None = None,
+ language: str | None = None,
+ definition_body: str | None = None,
+ arguments: Sequence[dict[str, Any]] | None = None,
+ return_type: dict[str, Any] | None = None,
+ return_table_type: dict[str, Any] | None = None,
+ imported_libraries: Sequence[str] | None = None,
+ determinism_level: str | None = None,
+ security_mode: str | None = None,
+ data_governance_type: str | None = None,
+ description: str | None = None,
+ remote_function_options: dict[str, Any] | None = None,
+ spark_options: dict[str, Any] | None = None,
+ if_exists: str = "fail",
+ gcp_conn_id: str = "google_cloud_default",
+ location: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ if if_exists not in {"fail", "skip", "replace"}:
+ raise ValueError(f"`if_exists` must be one of 'fail', 'skip',
'replace'; got {if_exists!r}")
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.routine_id = routine_id
+ self.routine_resource = routine_resource
+ self.routine_type = routine_type
+ self.language = language
+ self.definition_body = definition_body
+ self.arguments = arguments
+ self.return_type = return_type
+ self.return_table_type = return_table_type
+ self.imported_libraries = imported_libraries
+ self.determinism_level = determinism_level
+ self.security_mode = security_mode
+ self.data_governance_type = data_governance_type
+ self.description = description
+ self.remote_function_options = remote_function_options
+ self.spark_options = spark_options
+ self.if_exists = if_exists
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.impersonation_chain = impersonation_chain
+ self.retry = retry
+ self.timeout = timeout
+
+ def _build_resource(self) -> dict[str, Any]:
+ if isinstance(self.routine_resource, Routine):
+ resource = self.routine_resource.to_api_repr()
+ elif self.routine_resource is not None:
+ resource = dict(self.routine_resource)
+ else:
+ resource = {}
+
+ field_map = {
+ "routineType": self.routine_type,
+ "language": self.language,
+ "definitionBody": self.definition_body,
+ "arguments": list(self.arguments) if self.arguments is not None
else None,
+ "returnType": self.return_type,
+ "returnTableType": self.return_table_type,
+ "importedLibraries": (
+ list(self.imported_libraries) if self.imported_libraries is
not None else None
+ ),
+ "determinismLevel": self.determinism_level,
+ "securityMode": self.security_mode,
+ "dataGovernanceType": self.data_governance_type,
+ "description": self.description,
+ "remoteFunctionOptions": self.remote_function_options,
+ "sparkOptions": self.spark_options,
+ }
+ for key, value in field_map.items():
+ if value is not None:
+ resource.setdefault(key, value)
+ return resource
+
+ def execute(self, context: Context) -> dict[str, Any]:
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+ resource = self._build_resource()
+ self.log.info(
+ "Creating routine %s.%s.%s (if_exists=%s)",
+ self.project_id or hook.project_id,
+ self.dataset_id,
+ self.routine_id,
+ self.if_exists,
+ )
+ routine = hook.create_routine(
+ routine=resource,
+ dataset_id=self.dataset_id,
+ routine_id=self.routine_id,
+ project_id=self.project_id,
+ if_exists=self.if_exists,
+ retry=self.retry,
+ timeout=self.timeout,
+ )
+ return routine.to_api_repr()
+
+
+class BigQueryUpdateRoutineOperator(GoogleCloudBaseOperator):
+ """
+ Patch selected fields of an existing BigQuery routine.
+
+ Only the fields listed in ``fields`` are updated. Any field listed but
left unset in
+ ``routine_resource`` is cleared on the server.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BigQueryUpdateRoutineOperator`
+
+ :param dataset_id: The dataset that owns the routine.
+ :param routine_id: The identifier of the routine to update.
+ :param routine_resource: The routine resource (dict or
+ :class:`~google.cloud.bigquery.routine.Routine`) with the new values
for the fields
+ listed in ``fields``.
+ :param fields: Properties to update (e.g. ``["definitionBody",
"description"]``).
+ :param project_id: Optional. The project that owns the dataset.
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+ :param location: The location of the BigQuery dataset.
+ :param impersonation_chain: Optional service account to impersonate.
+ :param retry: A retry object used to retry requests.
+ :param timeout: The amount of time, in seconds, to wait for the request.
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "routine_id",
+ "routine_resource",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {"routine_resource": "json"}
+ template_ext: Sequence[str] = (".json", ".sql")
+ ui_color = BigQueryUIColors.TABLE.value
+
+ def __init__(
+ self,
+ *,
+ dataset_id: str,
+ routine_id: str,
+ routine_resource: dict[str, Any] | Routine,
+ fields: Sequence[str],
+ project_id: str = PROVIDE_PROJECT_ID,
+ gcp_conn_id: str = "google_cloud_default",
+ location: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ if not fields:
+ raise ValueError("`fields` must be a non-empty sequence of routine
properties to update.")
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.routine_id = routine_id
+ self.routine_resource = routine_resource
+ self.fields = list(fields)
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.impersonation_chain = impersonation_chain
+ self.retry = retry
+ self.timeout = timeout
+
+ def execute(self, context: Context) -> dict[str, Any]:
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Updating routine %s.%s.%s (fields=%s)",
+ self.project_id or hook.project_id,
+ self.dataset_id,
+ self.routine_id,
+ self.fields,
+ )
+ routine = hook.update_routine(
+ routine=self.routine_resource,
+ fields=self.fields,
+ dataset_id=self.dataset_id,
+ routine_id=self.routine_id,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ )
+ return routine.to_api_repr()
+
+
+class BigQueryDeleteRoutineOperator(GoogleCloudBaseOperator):
+ """
+ Delete a BigQuery routine.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BigQueryDeleteRoutineOperator`
+
+ :param dataset_id: The dataset that owns the routine.
+ :param routine_id: The identifier of the routine to delete.
+ :param project_id: Optional. The project that owns the dataset.
+ :param ignore_if_missing: If ``True``, do not fail when the routine does
not exist.
+ Defaults to ``False``.
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+ :param location: The location of the BigQuery dataset.
+ :param impersonation_chain: Optional service account to impersonate.
+ :param retry: A retry object used to retry requests.
+ :param timeout: The amount of time, in seconds, to wait for the request.
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "routine_id",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+ ui_color = BigQueryUIColors.TABLE.value
+
+ def __init__(
+ self,
+ *,
+ dataset_id: str,
+ routine_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ ignore_if_missing: bool = False,
+ gcp_conn_id: str = "google_cloud_default",
+ location: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.routine_id = routine_id
+ self.ignore_if_missing = ignore_if_missing
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.impersonation_chain = impersonation_chain
+ self.retry = retry
+ self.timeout = timeout
+
+ def execute(self, context: Context) -> None:
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Deleting routine %s.%s.%s",
+ self.project_id or hook.project_id,
+ self.dataset_id,
+ self.routine_id,
+ )
+ hook.delete_routine(
+ dataset_id=self.dataset_id,
+ routine_id=self.routine_id,
+ project_id=self.project_id,
+ not_found_ok=self.ignore_if_missing,
+ retry=self.retry,
+ timeout=self.timeout,
+ )
+
+
+class BigQueryGetRoutineOperator(GoogleCloudBaseOperator):
+ """
+ Fetch an existing BigQuery routine and return its serialized API
representation via XCom.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BigQueryGetRoutineOperator`
+
+ :param dataset_id: The dataset that owns the routine.
+ :param routine_id: The identifier of the routine to fetch.
+ :param project_id: Optional. The project that owns the dataset.
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+ :param location: The location of the BigQuery dataset.
+ :param impersonation_chain: Optional service account to impersonate.
+ :param retry: A retry object used to retry requests.
+ :param timeout: The amount of time, in seconds, to wait for the request.
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "routine_id",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+ ui_color = BigQueryUIColors.TABLE.value
+
+ def __init__(
+ self,
+ *,
+ dataset_id: str,
+ routine_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ gcp_conn_id: str = "google_cloud_default",
+ location: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.routine_id = routine_id
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.impersonation_chain = impersonation_chain
+ self.retry = retry
+ self.timeout = timeout
+
+ def execute(self, context: Context) -> dict[str, Any]:
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Fetching routine %s.%s.%s",
+ self.project_id or hook.project_id,
+ self.dataset_id,
+ self.routine_id,
+ )
+ routine = hook.get_routine(
+ dataset_id=self.dataset_id,
+ routine_id=self.routine_id,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ )
+ return routine.to_api_repr()
+
+
+class BigQueryListRoutinesOperator(GoogleCloudBaseOperator):
+ """
+ List routines in a BigQuery dataset and return them as a list via XCom.
+
+ The returned items are API representations. Only a subset of routine
fields is populated
+ on list responses; fetch individual routines with
:class:`BigQueryGetRoutineOperator` for
+ the complete resource.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BigQueryListRoutinesOperator`
+
+ :param dataset_id: The dataset to list routines for.
+ :param project_id: Optional. The project that owns the dataset.
+ :param max_results: Optional. Maximum number of routines to return.
+ :param page_token: Optional. Token identifying a page of results to return.
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+ :param location: The location of the BigQuery dataset.
+ :param impersonation_chain: Optional service account to impersonate.
+ :param retry: A retry object used to retry requests.
+ :param timeout: The amount of time, in seconds, to wait for the request.
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+ ui_color = BigQueryUIColors.TABLE.value
+
+ def __init__(
+ self,
+ *,
+ dataset_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ max_results: int | None = None,
+ page_token: str | None = None,
+ gcp_conn_id: str = "google_cloud_default",
+ location: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ retry: Retry = DEFAULT_RETRY,
+ timeout: float | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.max_results = max_results
+ self.page_token = page_token
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.impersonation_chain = impersonation_chain
+ self.retry = retry
+ self.timeout = timeout
+
+ def execute(self, context: Context) -> list[dict[str, Any]]:
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Listing routines in %s.%s",
+ self.project_id or hook.project_id,
+ self.dataset_id,
+ )
+ routines = hook.list_routines(
+ dataset_id=self.dataset_id,
+ project_id=self.project_id,
+ max_results=self.max_results,
+ page_token=self.page_token,
+ retry=self.retry,
+ timeout=self.timeout,
+ )
+ return [routine.to_api_repr() for routine in routines]
diff --git
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
index 5b891bb075c..71e37bff784 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
@@ -24,6 +24,8 @@ from collections.abc import Sequence
from datetime import timedelta
from typing import TYPE_CHECKING, Any
+from google.api_core.exceptions import NotFound
+
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import AirflowException,
BaseSensorOperator, conf
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@@ -148,6 +150,67 @@ class BigQueryTableExistenceSensor(BaseSensorOperator):
raise AirflowException(message)
+class BigQueryRoutineExistenceSensor(BaseSensorOperator):
+ """
+ Checks for the existence of a routine (UDF, procedure, or TVF) in a
BigQuery dataset.
+
+ :param project_id: The Google Cloud project that owns the dataset.
+ :param dataset_id: The dataset that owns the routine.
+ :param routine_id: The identifier of the routine to check.
+ :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "routine_id",
+ "impersonation_chain",
+ )
+ ui_color = "#f0eee4"
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ dataset_id: str,
+ routine_id: str,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.routine_id = routine_id
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def poke(self, context: Context) -> bool:
+ routine_uri = f"{self.project_id}.{self.dataset_id}.{self.routine_id}"
+ self.log.info("Sensor checks existence of routine: %s", routine_uri)
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ try:
+ hook.get_routine(
+ project_id=self.project_id,
+ dataset_id=self.dataset_id,
+ routine_id=self.routine_id,
+ )
+ except NotFound:
+ return False
+ return True
+
+
class BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
"""
Checks for the existence of a partition within a table in Google Bigquery.
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py
b/providers/google/src/airflow/providers/google/get_provider_info.py
index d0122a94c51..c172eee6e87 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -52,7 +52,10 @@ def get_provider_info():
},
{
"integration-name": "Google BigQuery",
- "how-to-guide":
["/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst"],
+ "how-to-guide": [
+
"/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst",
+
"/docs/apache-airflow-providers-google/operators/cloud/bigquery_routines.rst",
+ ],
"external-doc-url": "https://cloud.google.com/bigquery/",
"logo": "/docs/integration-logos/BigQuery.png",
"tags": ["gcp"],
diff --git
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
new file mode 100644
index 00000000000..03d238ef1b0
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG for Google BigQuery service testing routine operations.
+
+Exercises the full BigQuery routines lifecycle through Airflow: create a
scalar SQL
+UDF, a stored procedure, and a table-valued function; verify their existence;
list
+and fetch them; update one; and delete them all.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateRoutineOperator,
+ BigQueryDeleteDatasetOperator,
+ BigQueryDeleteRoutineOperator,
+ BigQueryGetRoutineOperator,
+ BigQueryListRoutinesOperator,
+ BigQueryUpdateRoutineOperator,
+)
+from airflow.providers.google.cloud.sensors.bigquery import
BigQueryRoutineExistenceSensor
+
+try:
+ from airflow.sdk import TriggerRule
+except ImportError:
+ # Compatibility for Airflow < 3.1
+ from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or "default"
+DAG_ID = "bigquery_routines"
+
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+SCALAR_ROUTINE = f"scalar_udf_{ENV_ID}"
+PROCEDURE_ROUTINE = f"procedure_{ENV_ID}"
+TVF_ROUTINE = f"tvf_{ENV_ID}"
+
+INT64_TYPE = {"typeKind": "INT64"}
+STRING_TYPE = {"typeKind": "STRING"}
+
+
+with DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "bigquery"],
+) as dag:
+ create_dataset = BigQueryCreateEmptyDatasetOperator(
+ task_id="create_dataset", dataset_id=DATASET_NAME,
project_id=PROJECT_ID
+ )
+
+ # [START howto_operator_bigquery_create_scalar_routine]
+ create_scalar_routine = BigQueryCreateRoutineOperator(
+ task_id="create_scalar_routine",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=SCALAR_ROUTINE,
+ routine_type="SCALAR_FUNCTION",
+ language="SQL",
+ arguments=[{"name": "x", "dataType": INT64_TYPE}],
+ return_type=INT64_TYPE,
+ definition_body="x + 1",
+ description="Adds one to its argument.",
+ )
+ # [END howto_operator_bigquery_create_scalar_routine]
+
+ # [START howto_operator_bigquery_create_procedure_routine]
+ create_procedure = BigQueryCreateRoutineOperator(
+ task_id="create_procedure",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=PROCEDURE_ROUTINE,
+ routine_type="PROCEDURE",
+ language="SQL",
+ arguments=[
+ {"name": "prefix", "dataType": STRING_TYPE, "argumentKind":
"FIXED_TYPE"},
+ ],
+ definition_body="BEGIN SELECT CONCAT(prefix, ' world') AS greeting;
END",
+ description="Echoes a prefixed greeting.",
+ )
+ # [END howto_operator_bigquery_create_procedure_routine]
+
+ # [START howto_operator_bigquery_create_tvf_routine]
+ create_tvf = BigQueryCreateRoutineOperator(
+ task_id="create_tvf",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=TVF_ROUTINE,
+ routine_type="TABLE_VALUED_FUNCTION",
+ language="SQL",
+ arguments=[{"name": "n", "dataType": INT64_TYPE}],
+ return_table_type={
+ "columns": [
+ {"name": "value", "type": INT64_TYPE},
+ ]
+ },
+ definition_body="SELECT * FROM UNNEST(GENERATE_ARRAY(1, n)) AS value",
+ description="Generates integers 1..n as a table.",
+ )
+ # [END howto_operator_bigquery_create_tvf_routine]
+
+ # [START howto_sensor_bigquery_routine_existence]
+ wait_for_routine = BigQueryRoutineExistenceSensor(
+ task_id="wait_for_routine",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=SCALAR_ROUTINE,
+ timeout=60,
+ poke_interval=5,
+ )
+ # [END howto_sensor_bigquery_routine_existence]
+
+ # [START howto_operator_bigquery_update_routine]
+ update_routine = BigQueryUpdateRoutineOperator(
+ task_id="update_routine",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=SCALAR_ROUTINE,
+ routine_resource={"description": "Updated description for scalar UDF"},
+ fields=["description"],
+ )
+ # [END howto_operator_bigquery_update_routine]
+
+ # [START howto_operator_bigquery_get_routine]
+ get_routine = BigQueryGetRoutineOperator(
+ task_id="get_routine",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=SCALAR_ROUTINE,
+ )
+ # [END howto_operator_bigquery_get_routine]
+
+ # [START howto_operator_bigquery_list_routines]
+ list_routines = BigQueryListRoutinesOperator(
+ task_id="list_routines",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ )
+ # [END howto_operator_bigquery_list_routines]
+
+ # [START howto_operator_bigquery_delete_routine]
+ delete_scalar_routine = BigQueryDeleteRoutineOperator(
+ task_id="delete_scalar_routine",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=SCALAR_ROUTINE,
+ ignore_if_missing=True,
+ )
+ # [END howto_operator_bigquery_delete_routine]
+
+ delete_procedure = BigQueryDeleteRoutineOperator(
+ task_id="delete_procedure",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=PROCEDURE_ROUTINE,
+ ignore_if_missing=True,
+ )
+ delete_tvf = BigQueryDeleteRoutineOperator(
+ task_id="delete_tvf",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ routine_id=TVF_ROUTINE,
+ ignore_if_missing=True,
+ )
+
+ delete_dataset = BigQueryDeleteDatasetOperator(
+ task_id="delete_dataset",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ delete_contents=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ (
+ # TEST SETUP
+ create_dataset
+ # TEST BODY
+ >> [create_scalar_routine, create_procedure, create_tvf]
+ >> wait_for_routine
+ >> update_routine
+ >> get_routine
+ >> list_routines
+ >> [delete_scalar_routine, delete_procedure, delete_tvf]
+ # TEST TEARDOWN
+ >> delete_dataset
+ )
+
+ from tests_common.test_utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
contributing-docs/testing/system_tests.rst)
+test_run = get_test_run(dag)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 054eefb1a11..ddda199849c 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -36,6 +36,7 @@ from google.cloud.bigquery import (
TableReference,
)
from google.cloud.bigquery.dataset import AccessEntry, Dataset, DatasetListItem
+from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.exceptions import NotFound
@@ -1012,6 +1013,177 @@ class TestTableOperations(_BigQueryBaseTestClass):
)
[email protected]_test
+class TestRoutineOperations(_BigQueryBaseTestClass):
+ ROUTINE_ID = "bq_routine"
+ ROUTINE_REF_REPR = {
+ "projectId": PROJECT_ID,
+ "datasetId": DATASET_ID,
+ "routineId": ROUTINE_ID,
+ }
+ ROUTINE_RESOURCE = {
+ "routineReference": ROUTINE_REF_REPR,
+ "routineType": "SCALAR_FUNCTION",
+ "language": "SQL",
+ "definitionBody": "x + 1",
+ "arguments": [{"name": "x", "dataType": {"typeKind": "INT64"}}],
+ "returnType": {"typeKind": "INT64"},
+ }
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_create_routine_fail_mode(self, mock_client):
+ self.hook.create_routine(
+ routine=dict(self.ROUTINE_RESOURCE),
+ project_id=PROJECT_ID,
+ )
+ mock_client.assert_called_once_with(project_id=PROJECT_ID)
+ call = mock_client.return_value.create_routine.call_args
+ assert call.kwargs["exists_ok"] is False
+ mock_client.return_value.delete_routine.assert_not_called()
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_create_routine_skip_mode(self, mock_client):
+ self.hook.create_routine(
+ routine=dict(self.ROUTINE_RESOURCE),
+ project_id=PROJECT_ID,
+ if_exists="skip",
+ )
+ assert
mock_client.return_value.create_routine.call_args.kwargs["exists_ok"] is True
+ mock_client.return_value.delete_routine.assert_not_called()
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_create_routine_replace_mode_deletes_first(self, mock_client):
+ self.hook.create_routine(
+ routine=dict(self.ROUTINE_RESOURCE),
+ project_id=PROJECT_ID,
+ if_exists="replace",
+ )
+ mock_client.return_value.delete_routine.assert_called_once()
+ create_call = mock_client.return_value.create_routine.call_args
+ assert create_call.kwargs["exists_ok"] is False
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_create_routine_replace_ignores_not_found_on_delete(self,
mock_client):
+ mock_client.return_value.delete_routine.side_effect = NotFound("nope")
+ self.hook.create_routine(
+ routine=dict(self.ROUTINE_RESOURCE),
+ project_id=PROJECT_ID,
+ if_exists="replace",
+ )
+ mock_client.return_value.create_routine.assert_called_once()
+
+ def test_create_routine_invalid_if_exists(self):
+ with pytest.raises(ValueError, match="must be one of"):
+ self.hook.create_routine(
+ routine=dict(self.ROUTINE_RESOURCE),
+ project_id=PROJECT_ID,
+ if_exists="bogus",
+ )
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_create_routine_fills_missing_reference(self, mock_client):
+ body = {
+ "routineType": "SCALAR_FUNCTION",
+ "language": "SQL",
+ "definitionBody": "1",
+ }
+ self.hook.create_routine(
+ routine=body,
+ dataset_id=DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ project_id=PROJECT_ID,
+ )
+ create_call = mock_client.return_value.create_routine.call_args
+ created_routine = (
+ create_call.kwargs["routine"] if "routine" in create_call.kwargs
else create_call.args[0]
+ )
+ ref = created_routine.reference
+ assert ref.project == PROJECT_ID
+ assert ref.dataset_id == DATASET_ID
+ assert ref.routine_id == self.ROUTINE_ID
+
+ def test_create_routine_missing_reference_raises(self):
+ with pytest.raises(ValueError, match="missing required fields"):
+ self.hook.create_routine(
+ routine={"routineType": "SCALAR_FUNCTION", "language": "SQL",
"definitionBody": "1"},
+ project_id=PROJECT_ID,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_update_routine(self, mock_client):
+ existing_repr = dict(self.ROUTINE_RESOURCE)
+ existing_repr["description"] = "original"
+ mock_client.return_value.get_routine.return_value =
Routine.from_api_repr(existing_repr)
+
+ self.hook.update_routine(
+ routine={"description": "new", "definitionBody": "x + 2"},
+ fields=["description", "definitionBody"],
+ dataset_id=DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ project_id=PROJECT_ID,
+ )
+
+ mock_client.return_value.get_routine.assert_called_once()
+ update_call = mock_client.return_value.update_routine.call_args
+ sent_routine = update_call.args[0] if update_call.args else
update_call.kwargs["routine"]
+ sent_fields = update_call.args[1] if len(update_call.args) > 1 else
update_call.kwargs["fields"]
+ # Merged resource carries the updated values plus the untouched
original metadata.
+ assert sent_routine.description == "new"
+ assert sent_routine.body == "x + 2"
+ assert sent_routine.type_ == "SCALAR_FUNCTION"
+ # Full-resource PUT: all writable properties are included in the
outbound fields list.
+ assert "description" in sent_fields
+ assert "body" in sent_fields
+ assert "type_" in sent_fields
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_delete_routine(self, mock_client):
+ self.hook.delete_routine(
+ dataset_id=DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ project_id=PROJECT_ID,
+ )
+ mock_client.return_value.delete_routine.assert_called_once()
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_delete_routine_not_found_ok(self, mock_client):
+ mock_client.return_value.delete_routine.side_effect = NotFound("nope")
+ # default not_found_ok=True — should not raise
+ self.hook.delete_routine(
+ dataset_id=DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ project_id=PROJECT_ID,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_delete_routine_raises_when_not_found_ok_false(self, mock_client):
+ mock_client.return_value.delete_routine.side_effect = NotFound("nope")
+ with pytest.raises(NotFound):
+ self.hook.delete_routine(
+ dataset_id=DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ project_id=PROJECT_ID,
+ not_found_ok=False,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_get_routine(self, mock_client):
+ self.hook.get_routine(
+ dataset_id=DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ project_id=PROJECT_ID,
+ )
+ mock_client.return_value.get_routine.assert_called_once()
+
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+ def test_list_routines(self, mock_client):
+ mock_client.return_value.list_routines.return_value =
iter([mock.MagicMock(), mock.MagicMock()])
+ result = self.hook.list_routines(dataset_id=DATASET_ID,
project_id=PROJECT_ID)
+ assert len(result) == 2
+ list_call = mock_client.return_value.list_routines.call_args
+ assert list_call.kwargs["max_results"] is None
+
+
@pytest.mark.db_test
class TestBigQueryCursor(_BigQueryBaseTestClass):
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.build")
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index 70427e794e6..f857c6713d4 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -27,6 +27,7 @@ from unittest.mock import ANY, MagicMock
import pandas as pd
import pytest
from google.cloud.bigquery import DEFAULT_RETRY, ScalarQueryParameter, Table
+from google.cloud.bigquery.routine import Routine
from google.cloud.exceptions import Conflict
from airflow.providers.common.compat.openlineage.facet import (
@@ -52,16 +53,21 @@ from airflow.providers.google.cloud.operators.bigquery
import (
BigQueryCheckOperator,
BigQueryColumnCheckOperator,
BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateRoutineOperator,
BigQueryCreateTableOperator,
BigQueryDeleteDatasetOperator,
+ BigQueryDeleteRoutineOperator,
BigQueryDeleteTableOperator,
BigQueryGetDataOperator,
BigQueryGetDatasetOperator,
BigQueryGetDatasetTablesOperator,
+ BigQueryGetRoutineOperator,
BigQueryInsertJobOperator,
BigQueryIntervalCheckOperator,
+ BigQueryListRoutinesOperator,
BigQueryTableCheckOperator,
BigQueryUpdateDatasetOperator,
+ BigQueryUpdateRoutineOperator,
BigQueryUpdateTableOperator,
BigQueryUpdateTableSchemaOperator,
BigQueryUpsertTableOperator,
@@ -2877,3 +2883,180 @@ class TestBigQueryTableCheckOperator:
job_id="",
nowait=False,
)
+
+
+TEST_ROUTINE_ID = "test-routine-id"
+TEST_ROUTINE_REF = {
+ "projectId": TEST_GCP_PROJECT_ID,
+ "datasetId": TEST_DATASET,
+ "routineId": TEST_ROUTINE_ID,
+}
+TEST_ROUTINE_RESOURCE = {
+ "routineReference": TEST_ROUTINE_REF,
+ "routineType": "SCALAR_FUNCTION",
+ "language": "SQL",
+ "definitionBody": "x + 1",
+ "arguments": [{"name": "x", "dataType": {"typeKind": "INT64"}}],
+ "returnType": {"typeKind": "INT64"},
+}
+
+
+class TestBigQueryCreateRoutineOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_with_resource(self, mock_hook):
+
mock_hook.return_value.create_routine.return_value.to_api_repr.return_value =
TEST_ROUTINE_RESOURCE
+ operator = BigQueryCreateRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ routine_resource=dict(TEST_ROUTINE_RESOURCE),
+ )
+ result = operator.execute(context=MagicMock())
+
+ assert result == TEST_ROUTINE_RESOURCE
+ mock_hook.return_value.create_routine.assert_called_once()
+ call_kwargs = mock_hook.return_value.create_routine.call_args.kwargs
+ assert call_kwargs["dataset_id"] == TEST_DATASET
+ assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+ assert call_kwargs["project_id"] == TEST_GCP_PROJECT_ID
+ assert call_kwargs["if_exists"] == "fail"
+
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_builds_resource_from_fields(self, mock_hook):
+
mock_hook.return_value.create_routine.return_value.to_api_repr.return_value = {}
+ operator = BigQueryCreateRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ routine_type="SCALAR_FUNCTION",
+ language="SQL",
+ definition_body="x + 1",
+ arguments=[{"name": "x", "dataType": {"typeKind": "INT64"}}],
+ return_type={"typeKind": "INT64"},
+ if_exists="replace",
+ )
+ operator.execute(context=MagicMock())
+
+ call_kwargs = mock_hook.return_value.create_routine.call_args.kwargs
+ routine_resource = call_kwargs["routine"]
+ assert routine_resource["routineType"] == "SCALAR_FUNCTION"
+ assert routine_resource["language"] == "SQL"
+ assert routine_resource["definitionBody"] == "x + 1"
+ assert routine_resource["returnType"] == {"typeKind": "INT64"}
+ assert call_kwargs["if_exists"] == "replace"
+
+ def test_invalid_if_exists(self):
+ with pytest.raises(ValueError, match="must be one of"):
+ BigQueryCreateRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ if_exists="bogus",
+ )
+
+
+class TestBigQueryUpdateRoutineOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute(self, mock_hook):
+
mock_hook.return_value.update_routine.return_value.to_api_repr.return_value =
TEST_ROUTINE_RESOURCE
+ operator = BigQueryUpdateRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ routine_resource=dict(TEST_ROUTINE_RESOURCE),
+ fields=["description", "definitionBody"],
+ )
+ result = operator.execute(context=MagicMock())
+
+ assert result == TEST_ROUTINE_RESOURCE
+ call_kwargs = mock_hook.return_value.update_routine.call_args.kwargs
+ assert call_kwargs["fields"] == ["description", "definitionBody"]
+ assert call_kwargs["dataset_id"] == TEST_DATASET
+ assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+
+ def test_empty_fields_raises(self):
+ with pytest.raises(ValueError, match="non-empty sequence"):
+ BigQueryUpdateRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ routine_resource=dict(TEST_ROUTINE_RESOURCE),
+ fields=[],
+ )
+
+
+class TestBigQueryDeleteRoutineOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute(self, mock_hook):
+ operator = BigQueryDeleteRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+ operator.execute(context=MagicMock())
+
+ call_kwargs = mock_hook.return_value.delete_routine.call_args.kwargs
+ assert call_kwargs["dataset_id"] == TEST_DATASET
+ assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+ assert call_kwargs["project_id"] == TEST_GCP_PROJECT_ID
+ assert call_kwargs["not_found_ok"] is False
+
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_ignore_if_missing(self, mock_hook):
+ operator = BigQueryDeleteRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ ignore_if_missing=True,
+ )
+ operator.execute(context=MagicMock())
+ call_kwargs = mock_hook.return_value.delete_routine.call_args.kwargs
+ assert call_kwargs["not_found_ok"] is True
+
+
+class TestBigQueryGetRoutineOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute(self, mock_hook):
+
mock_hook.return_value.get_routine.return_value.to_api_repr.return_value =
TEST_ROUTINE_RESOURCE
+ operator = BigQueryGetRoutineOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ routine_id=TEST_ROUTINE_ID,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+ result = operator.execute(context=MagicMock())
+
+ assert result == TEST_ROUTINE_RESOURCE
+ call_kwargs = mock_hook.return_value.get_routine.call_args.kwargs
+ assert call_kwargs["dataset_id"] == TEST_DATASET
+ assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+
+
+class TestBigQueryListRoutinesOperator:
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute(self, mock_hook):
+ first, second = MagicMock(spec=Routine), MagicMock(spec=Routine)
+ first.to_api_repr.return_value = {"routineReference": {"routineId":
"r1"}}
+ second.to_api_repr.return_value = {"routineReference": {"routineId":
"r2"}}
+ mock_hook.return_value.list_routines.return_value = [first, second]
+
+ operator = BigQueryListRoutinesOperator(
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ project_id=TEST_GCP_PROJECT_ID,
+ max_results=10,
+ )
+ result = operator.execute(context=MagicMock())
+
+ assert result == [
+ {"routineReference": {"routineId": "r1"}},
+ {"routineReference": {"routineId": "r2"}},
+ ]
+ call_kwargs = mock_hook.return_value.list_routines.call_args.kwargs
+ assert call_kwargs["dataset_id"] == TEST_DATASET
+ assert call_kwargs["max_results"] == 10
diff --git a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
index 00776d89013..174f3b217bd 100644
--- a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
@@ -19,9 +19,11 @@ from __future__ import annotations
from unittest import mock
import pytest
+from google.api_core.exceptions import NotFound
from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
from airflow.providers.google.cloud.sensors.bigquery import (
+ BigQueryRoutineExistenceSensor,
BigQueryTableExistenceSensor,
BigQueryTablePartitionExistenceSensor,
)
@@ -245,6 +247,43 @@ class TestBigqueryTablePartitionExistenceSensor:
)
+class TestBigQueryRoutineExistenceSensor:
+ ROUTINE_ID = "test_routine_id"
+
+ @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+ def test_poke_returns_true_when_routine_exists(self, mock_hook):
+ task = BigQueryRoutineExistenceSensor(
+ task_id="task-id",
+ project_id=TEST_PROJECT_ID,
+ dataset_id=TEST_DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ assert task.poke(mock.MagicMock()) is True
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.get_routine.assert_called_once_with(
+ project_id=TEST_PROJECT_ID,
+ dataset_id=TEST_DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ )
+
+ @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+ def test_poke_returns_false_when_routine_missing(self, mock_hook):
+ mock_hook.return_value.get_routine.side_effect = NotFound("not found")
+ task = BigQueryRoutineExistenceSensor(
+ task_id="task-id",
+ project_id=TEST_PROJECT_ID,
+ dataset_id=TEST_DATASET_ID,
+ routine_id=self.ROUTINE_ID,
+ )
+ assert task.poke(mock.MagicMock()) is False
+
+
@pytest.fixture
def context():
"""