This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 538e8d37a81 feature: Add OpenLineage support for 
BigQueryToMySqlOperator (#55219)
538e8d37a81 is described below

commit 538e8d37a81872db58c10e56bbe54dacbc5ff08c
Author: pawelgrochowicz <[email protected]>
AuthorDate: Mon Sep 8 09:34:37 2025 +0200

    feature: Add OpenLineage support for BigQueryToMySqlOperator (#55219)
    
    * feature: Add OpenLineage support for BigQueryToMsSqlOperator
    
    * feature: Add OpenLineage support for BigQueryToMySqlOperator
    
    * feature: Add OpenLineage support for BigQueryToMySqlOperator
---
 .../google/cloud/transfers/bigquery_to_mysql.py    |  72 +++++++++++++-
 .../cloud/transfers/test_bigquery_to_mysql.py      | 106 +++++++++++++++++++++
 2 files changed, 177 insertions(+), 1 deletion(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
index 33b1d6f7ddb..544d33f275f 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
@@ -21,11 +21,17 @@ from __future__ import annotations
 
 import warnings
 from collections.abc import Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.cloud.transfers.bigquery_to_sql import 
BigQueryToSqlBaseOperator
 from airflow.providers.mysql.hooks.mysql import MySqlHook
 
+if TYPE_CHECKING:
+    from airflow.providers.openlineage.extractors import OperatorLineage
+
 
 class BigQueryToMySqlOperator(BigQueryToSqlBaseOperator):
     """
@@ -76,5 +82,69 @@ class BigQueryToMySqlOperator(BigQueryToSqlBaseOperator):
         )
         self.mysql_conn_id = mysql_conn_id
 
-    def get_sql_hook(self) -> MySqlHook:
+    @cached_property
+    def mysql_hook(self) -> MySqlHook:
         return MySqlHook(schema=self.database, 
mysql_conn_id=self.mysql_conn_id)
+
+    def get_sql_hook(self) -> MySqlHook:
+        return self.mysql_hook
+
+    def execute(self, context):
+        # Set source_project_dataset_table here, after hooks are initialized 
and project_id is available
+        project_id = self.bigquery_hook.project_id
+        self.source_project_dataset_table = 
f"{project_id}.{self.dataset_id}.{self.table_id}"
+        return super().execute(context)
+
+    def get_openlineage_facets_on_complete(self, task_instance) -> 
OperatorLineage | None:
+        from airflow.providers.common.compat.openlineage.facet import Dataset
+        from airflow.providers.google.cloud.openlineage.utils import (
+            BIGQUERY_NAMESPACE,
+            get_facets_from_bq_table_for_given_fields,
+            get_identity_column_lineage_facet,
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        if not self.bigquery_hook:
+            self.bigquery_hook = BigQueryHook(
+                gcp_conn_id=self.gcp_conn_id,
+                location=self.location,
+                impersonation_chain=self.impersonation_chain,
+            )
+
+        try:
+            table_obj = 
self.bigquery_hook.get_client().get_table(self.source_project_dataset_table)
+        except Exception:
+            self.log.debug(
+                "OpenLineage: could not fetch BigQuery table %s",
+                self.source_project_dataset_table,
+                exc_info=True,
+            )
+            return OperatorLineage()
+
+        if self.selected_fields:
+            if isinstance(self.selected_fields, str):
+                bigquery_field_names = list(self.selected_fields)
+            else:
+                bigquery_field_names = self.selected_fields
+        else:
+            bigquery_field_names = [f.name for f in getattr(table_obj, 
"schema", [])]
+
+        input_dataset = Dataset(
+            namespace=BIGQUERY_NAMESPACE,
+            name=self.source_project_dataset_table,
+            facets=get_facets_from_bq_table_for_given_fields(table_obj, 
bigquery_field_names),
+        )
+
+        db_info = 
self.mysql_hook.get_openlineage_database_info(self.mysql_hook.get_conn())
+        namespace = f"{db_info.scheme}://{db_info.authority}"
+
+        output_name = f"{self.database}.{self.target_table_name}"
+
+        column_lineage_facet = get_identity_column_lineage_facet(
+            bigquery_field_names, input_datasets=[input_dataset]
+        )
+
+        output_facets = column_lineage_facet or {}
+        output_dataset = Dataset(namespace=namespace, name=output_name, 
facets=output_facets)
+
+        return OperatorLineage(inputs=[input_dataset], 
outputs=[output_dataset])
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mysql.py 
b/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mysql.py
index 3e24bef38d6..1865efe7cfc 100644
--- 
a/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mysql.py
+++ 
b/providers/google/tests/unit/google/cloud/transfers/test_bigquery_to_mysql.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 from unittest import mock
+from unittest.mock import MagicMock
 
 from airflow.providers.google.cloud.transfers.bigquery_to_mysql import 
BigQueryToMySqlOperator
 
@@ -25,6 +26,25 @@ TASK_ID = "test-bq-create-table-operator"
 TEST_DATASET = "test-dataset"
 TEST_TABLE_ID = "test-table-id"
 TEST_DAG_ID = "test-bigquery-operators"
+TEST_PROJECT = "test-project"
+
+
+def _make_bq_table(schema_names: list[str]):
+    class TableObj:
+        def __init__(self, schema):
+            self.schema = []
+            for n in schema:
+                field = MagicMock()
+                field.name = n
+                self.schema.append(field)
+            self.description = "table description"
+            self.external_data_configuration = None
+            self.labels = {}
+            self.num_rows = 0
+            self.num_bytes = 0
+            self.table_type = "TABLE"
+
+    return TableObj(schema_names)
 
 
 class TestBigQueryToMySqlOperator:
@@ -46,3 +66,89 @@ class TestBigQueryToMySqlOperator:
             selected_fields=None,
             start_index=0,
         )
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mysql.MySqlHook")
+    def test_get_openlineage_facets_on_complete_no_selected_fields(self, 
mock_mysql_hook, mock_bq_hook):
+        mock_bq_client = MagicMock()
+        mock_bq_client.get_table.return_value = _make_bq_table(["id", "name", 
"value"])
+        mock_bq_hook.get_client.return_value = mock_bq_client
+        mock_bq_hook.return_value = mock_bq_hook
+
+        db_info = MagicMock(scheme="mysql", authority="localhost:3306", 
database="mydb")
+        mock_mysql_hook.get_openlineage_database_info.return_value = db_info
+        mock_mysql_hook.return_value = mock_mysql_hook
+
+        op = BigQueryToMySqlOperator(
+            task_id=TASK_ID,
+            dataset_table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
+            target_table_name="destination",
+            selected_fields=None,
+            database="mydb",
+        )
+        op.bigquery_hook = mock_bq_hook
+        op.bigquery_hook.project_id = TEST_PROJECT
+        op.mysql_hook = mock_mysql_hook
+        context = mock.MagicMock()
+        op.execute(context=context)
+
+        result = op.get_openlineage_facets_on_complete(None)
+        assert len(result.inputs) == 1
+        assert len(result.outputs) == 1
+
+        input_ds = result.inputs[0]
+        assert input_ds.namespace == "bigquery"
+        assert input_ds.name == 
f"{TEST_PROJECT}.{TEST_DATASET}.{TEST_TABLE_ID}"
+        assert "schema" in input_ds.facets
+        schema_fields = [f.name for f in input_ds.facets["schema"].fields]
+        assert set(schema_fields) == {"id", "name", "value"}
+
+        output_ds = result.outputs[0]
+        assert output_ds.namespace == "mysql://localhost:3306"
+        assert output_ds.name == "mydb.destination"
+        assert "columnLineage" in output_ds.facets
+        col_lineage = output_ds.facets["columnLineage"]
+        assert set(col_lineage.fields.keys()) == {"id", "name", "value"}
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mysql.MySqlHook")
+    def test_get_openlineage_facets_on_complete_selected_fields(self, 
mock_mysql_hook, mock_bq_hook):
+        mock_bq_client = MagicMock()
+        mock_bq_client.get_table.return_value = _make_bq_table(["id", "name", 
"value"])
+        mock_bq_hook.get_client.return_value = mock_bq_client
+        mock_bq_hook.return_value = mock_bq_hook
+
+        db_info = MagicMock(scheme="mysql", authority="localhost:3306", 
database="mydb")
+        mock_mysql_hook.get_openlineage_database_info.return_value = db_info
+        mock_mysql_hook.return_value = mock_mysql_hook
+
+        op = BigQueryToMySqlOperator(
+            task_id=TASK_ID,
+            dataset_table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
+            target_table_name="destination",
+            selected_fields=["id", "name"],
+            database="mydb",
+        )
+        op.bigquery_hook = mock_bq_hook
+        op.bigquery_hook.project_id = TEST_PROJECT
+        op.mysql_hook = mock_mysql_hook
+        context = mock.MagicMock()
+        op.execute(context=context)
+
+        result = op.get_openlineage_facets_on_complete(None)
+        assert len(result.inputs) == 1
+        assert len(result.outputs) == 1
+
+        input_ds = result.inputs[0]
+        assert input_ds.namespace == "bigquery"
+        assert input_ds.name == 
f"{TEST_PROJECT}.{TEST_DATASET}.{TEST_TABLE_ID}"
+        assert "schema" in input_ds.facets
+        schema_fields = [f.name for f in input_ds.facets["schema"].fields]
+        assert set(schema_fields) == {"id", "name"}
+
+        output_ds = result.outputs[0]
+        assert output_ds.namespace == "mysql://localhost:3306"
+        assert output_ds.name == "mydb.destination"
+        assert "columnLineage" in output_ds.facets
+        col_lineage = output_ds.facets["columnLineage"]
+        assert set(col_lineage.fields.keys()) == {"id", "name"}

Reply via email to