This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0351fbf2c4e feature: Add OpenLineage support for
BigQueryToMsSqlOperator (#55168)
0351fbf2c4e is described below
commit 0351fbf2c4ebe7c9081e86c6c0007efdaf549abb
Author: pawelgrochowicz <[email protected]>
AuthorDate: Wed Sep 3 10:46:44 2025 +0200
feature: Add OpenLineage support for BigQueryToMsSqlOperator (#55168)
* feature: Add OpenLineage support for BigQueryToMsSqlOperator
* feature: Add OpenLineage support for BigQueryToMsSqlOperator
* feature: Add OpenLineage support for BigQueryToMsSqlOperator
---
.../providers/google/cloud/openlineage/utils.py | 14 +++
.../google/cloud/transfers/bigquery_to_mssql.py | 73 +++++++++++++-
.../google/cloud/transfers/bigquery_to_sql.py | 10 +-
.../cloud/transfers/test_bigquery_to_mssql.py | 110 +++++++++++++++++++++
4 files changed, 203 insertions(+), 4 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py
b/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py
index aabeb581d7a..d0012f2301b 100644
--- a/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py
+++ b/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py
@@ -214,7 +214,20 @@ def extract_ds_name_from_gcs_path(path: str) -> str:
def get_facets_from_bq_table(table: Table) -> dict[str, DatasetFacet]:
"""Get facets from BigQuery table object."""
+ return get_facets_from_bq_table_for_given_fields(table,
selected_fields=None)
+
+
+def get_facets_from_bq_table_for_given_fields(
+ table: Table, selected_fields: list[str] | None
+) -> dict[str, DatasetFacet]:
+ """
+ Get facets from BigQuery table object for selected fields only.
+
+ If selected_fields is None, include all fields.
+ """
facets: dict[str, DatasetFacet] = {}
+ selected_fields_set = set(selected_fields) if selected_fields else None
+
if table.schema:
facets["schema"] = SchemaDatasetFacet(
fields=[
@@ -222,6 +235,7 @@ def get_facets_from_bq_table(table: Table) -> dict[str,
DatasetFacet]:
name=schema_field.name, type=schema_field.field_type,
description=schema_field.description
)
for schema_field in table.schema
+ if selected_fields_set is None or schema_field.name in
selected_fields_set
]
)
if table.description:
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
index ec63aeee5f6..0464fd79207 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
@@ -21,14 +21,17 @@ from __future__ import annotations
import warnings
from collections.abc import Sequence
+from functools import cached_property
from typing import TYPE_CHECKING
from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
from airflow.providers.google.cloud.transfers.bigquery_to_sql import
BigQueryToSqlBaseOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
if TYPE_CHECKING:
+ from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context
@@ -94,9 +97,13 @@ class BigQueryToMsSqlOperator(BigQueryToSqlBaseOperator):
self.mssql_conn_id = mssql_conn_id
self.source_project_dataset_table = source_project_dataset_table
- def get_sql_hook(self) -> MsSqlHook:
+ @cached_property
+ def mssql_hook(self) -> MsSqlHook:
return MsSqlHook(schema=self.database,
mssql_conn_id=self.mssql_conn_id)
+ def get_sql_hook(self) -> MsSqlHook:
+ return self.mssql_hook
+
def persist_links(self, context: Context) -> None:
project_id, dataset_id, table_id =
self.source_project_dataset_table.split(".")
BigQueryTableLink.persist(
@@ -105,3 +112,67 @@ class BigQueryToMsSqlOperator(BigQueryToSqlBaseOperator):
project_id=project_id,
table_id=table_id,
)
+
+ def get_openlineage_facets_on_complete(self, task_instance) ->
OperatorLineage | None:
+ from airflow.providers.common.compat.openlineage.facet import Dataset
+ from airflow.providers.google.cloud.openlineage.utils import (
+ BIGQUERY_NAMESPACE,
+ get_facets_from_bq_table_for_given_fields,
+ get_identity_column_lineage_facet,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ if not self.bigquery_hook:
+ self.bigquery_hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ try:
+ table_obj =
self.bigquery_hook.get_client().get_table(self.source_project_dataset_table)
+ except Exception:
+ self.log.debug(
+ "OpenLineage: could not fetch BigQuery table %s",
+ self.source_project_dataset_table,
+ exc_info=True,
+ )
+ return OperatorLineage()
+
+ if self.selected_fields:
+ if isinstance(self.selected_fields, str):
+ bigquery_field_names = list(self.selected_fields)
+ else:
+ bigquery_field_names = self.selected_fields
+ else:
+ bigquery_field_names = [f.name for f in getattr(table_obj,
"schema", [])]
+
+ input_dataset = Dataset(
+ namespace=BIGQUERY_NAMESPACE,
+ name=self.source_project_dataset_table,
+ facets=get_facets_from_bq_table_for_given_fields(table_obj,
bigquery_field_names),
+ )
+
+ db_info =
self.mssql_hook.get_openlineage_database_info(self.mssql_hook.get_conn())
+ default_schema = self.mssql_hook.get_openlineage_default_schema()
+ namespace = f"{db_info.scheme}://{db_info.authority}"
+
+ if self.target_table_name and "." in self.target_table_name:
+ schema_name, table_name = self.target_table_name.split(".", 1)
+ else:
+ schema_name = default_schema or ""
+ table_name = self.target_table_name or ""
+
+ if self.database:
+ output_name = f"{self.database}.{schema_name}.{table_name}"
+ else:
+ output_name = f"{schema_name}.{table_name}"
+
+ column_lineage_facet = get_identity_column_lineage_facet(
+ bigquery_field_names, input_datasets=[input_dataset]
+ )
+
+ output_facets = column_lineage_facet or {}
+ output_dataset = Dataset(namespace=namespace, name=output_name,
facets=output_facets)
+
+ return OperatorLineage(inputs=[input_dataset],
outputs=[output_dataset])
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
index dc3ad68fb81..20a9f8edc30 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
@@ -21,6 +21,7 @@ from __future__ import annotations
import abc
from collections.abc import Sequence
+from functools import cached_property
from typing import TYPE_CHECKING
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@@ -113,19 +114,22 @@ class BigQueryToSqlBaseOperator(BaseOperator):
def persist_links(self, context: Context) -> None:
"""Persist the connection to the SQL provider."""
- def execute(self, context: Context) -> None:
- big_query_hook = BigQueryHook(
+ @cached_property
+ def bigquery_hook(self) -> BigQueryHook:
+ return BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
+
+ def execute(self, context: Context) -> None:
self.persist_links(context)
sql_hook = self.get_sql_hook()
for rows in bigquery_get_data(
self.log,
self.dataset_id,
self.table_id,
- big_query_hook,
+ self.bigquery_hook,
self.batch_size,
self.selected_fields,
):
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mssql.py
b/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mssql.py
index fb6023e6191..db26444c1fd 100644
---
a/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mssql.py
+++
b/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mssql.py
@@ -18,6 +18,7 @@
from __future__ import annotations
from unittest import mock
+from unittest.mock import MagicMock
from airflow.providers.google.cloud.transfers.bigquery_to_mssql import
BigQueryToMsSqlOperator
@@ -28,6 +29,24 @@ TEST_DAG_ID = "test-bigquery-operators"
TEST_PROJECT = "test-project"
+def _make_bq_table(schema_names: list[str]):
+ class TableObj:
+ def __init__(self, schema):
+ self.schema = []
+ for n in schema:
+ field = MagicMock()
+ field.name = n
+ self.schema.append(field)
+ self.description = "table description"
+ self.external_data_configuration = None
+ self.labels = {}
+ self.num_rows = 0
+ self.num_bytes = 0
+ self.table_type = "TABLE"
+
+ return TableObj(schema_names)
+
+
class TestBigQueryToMsSqlOperator:
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryTableLink")
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
@@ -85,3 +104,94 @@ class TestBigQueryToMsSqlOperator:
project_id=TEST_PROJECT,
table_id=TEST_TABLE_ID,
)
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.MsSqlHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_no_selected_fields(self,
mock_bq_hook, mock_mssql_hook):
+ mock_bq_client = MagicMock()
+ table_obj = _make_bq_table(["id", "name", "value"])
+ mock_bq_client.get_table.return_value = table_obj
+ mock_bq_hook.get_client.return_value = mock_bq_client
+ mock_bq_hook.return_value = mock_bq_hook
+
+ db_info = MagicMock(scheme="mssql", authority="localhost:1433",
database="mydb")
+ mock_mssql_hook.get_openlineage_database_info.return_value = db_info
+ mock_mssql_hook.get_openlineage_default_schema.return_value = "dbo"
+ mock_mssql_hook.return_value = mock_mssql_hook
+
+ op = BigQueryToMsSqlOperator(
+ task_id="test",
+ source_project_dataset_table="proj.dataset.table",
+ target_table_name="dbo.destination",
+ selected_fields=None,
+ database="mydb",
+ )
+ op.bigquery_hook = mock_bq_hook
+ op.mssql_hook = mock_mssql_hook
+ context = mock.MagicMock()
+ op.execute(context=context)
+
+ result =
op.get_openlineage_facets_on_complete(task_instance=MagicMock())
+ assert len(result.inputs) == 1
+ assert len(result.outputs) == 1
+
+ input_ds = result.inputs[0]
+ assert input_ds.namespace == "bigquery"
+ assert input_ds.name == "proj.dataset.table"
+
+ assert "schema" in input_ds.facets
+ schema_fields = [f.name for f in input_ds.facets["schema"].fields]
+ assert set(schema_fields) == {"id", "name", "value"}
+
+ output_ds = result.outputs[0]
+ assert output_ds.namespace == "mssql://localhost:1433"
+ assert output_ds.name == "mydb.dbo.destination"
+
+ assert "columnLineage" in output_ds.facets
+ col_lineage = output_ds.facets["columnLineage"]
+ assert set(col_lineage.fields.keys()) == {"id", "name", "value"}
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.MsSqlHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_selected_fields(self,
mock_bq_hook, mock_mssql_hook):
+ mock_bq_client = MagicMock()
+ table_obj = _make_bq_table(["id", "name", "value"])
+ mock_bq_client.get_table.return_value = table_obj
+ mock_bq_hook.get_client.return_value = mock_bq_client
+ mock_bq_hook.return_value = mock_bq_hook
+
+ db_info = MagicMock(scheme="mssql", authority="server.example:1433",
database="mydb")
+ mock_mssql_hook.get_openlineage_database_info.return_value = db_info
+ mock_mssql_hook.get_openlineage_default_schema.return_value = "dbo"
+ mock_mssql_hook.return_value = mock_mssql_hook
+
+ op = BigQueryToMsSqlOperator(
+ task_id="test",
+ source_project_dataset_table="proj.dataset.table",
+ target_table_name="dbo.destination",
+ selected_fields=["id", "name"],
+ database="mydb",
+ )
+ op.bigquery_hook = mock_bq_hook
+ op.mssql_hook = mock_mssql_hook
+ context = mock.MagicMock()
+ op.execute(context=context)
+
+ result =
op.get_openlineage_facets_on_complete(task_instance=MagicMock())
+ assert len(result.inputs) == 1
+ assert len(result.outputs) == 1
+
+ input_ds = result.inputs[0]
+ assert input_ds.namespace == "bigquery"
+ assert "schema" in input_ds.facets
+
+ schema_fields = [f.name for f in input_ds.facets["schema"].fields]
+ assert set(schema_fields) == {"id", "name"}
+
+ output_ds = result.outputs[0]
+ assert output_ds.namespace == "mssql://server.example:1433"
+ assert output_ds.name == "mydb.dbo.destination"
+
+ assert "columnLineage" in output_ds.facets
+ col_lineage = output_ds.facets["columnLineage"]
+ assert set(col_lineage.fields.keys()) == {"id", "name"}