This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new de635e7fd01 feat: Improve Hook Level Lineage for BigQueryHook (#62231)
de635e7fd01 is described below
commit de635e7fd01c5e12fc942b2ee108ec2e140f7c43
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Feb 24 16:05:57 2026 +0100
feat: Improve Hook Level Lineage for BigQueryHook (#62231)
---
.../providers/google/cloud/hooks/bigquery.py | 38 +++-
.../providers/google/cloud/utils/lineage.py | 94 ++++++++
.../tests/unit/google/cloud/hooks/test_bigquery.py | 22 +-
.../tests/unit/google/cloud/utils/test_lineage.py | 237 +++++++++++++++++++++
4 files changed, 363 insertions(+), 28 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 1963ff93a0b..c882fa26dd7 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -66,6 +66,7 @@ from airflow.providers.common.sql.hooks.lineage import
send_sql_hook_lineage
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.google.cloud.utils.bigquery import bq_cast
from airflow.providers.google.cloud.utils.credentials_provider import
_get_scopes
+from airflow.providers.google.cloud.utils.lineage import
send_hook_lineage_for_bq_job
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.deprecated import deprecated
from airflow.providers.google.common.hooks.base_google import (
@@ -88,6 +89,7 @@ if TYPE_CHECKING:
from google.api_core.retry import Retry
from requests import Session
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo
from airflow.sdk import Context
log = logging.getLogger(__name__)
@@ -1330,19 +1332,10 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
# Start the job and wait for it to complete and get the result.
job_api_repr.result(timeout=timeout, retry=retry)
- self._send_hook_level_lineage_for_bq_job(job=job_api_repr)
+ send_hook_lineage_for_bq_job(context=self, job=job_api_repr)
return job_api_repr
- def _send_hook_level_lineage_for_bq_job(self, job):
- # TODO(kacpermuda) Add support for other job types and more params to
sql job
- if job.job_type == QueryJob.job_type:
- send_sql_hook_lineage(
- context=self,
- sql=job.query,
- job_id=job.job_id,
- )
-
def generate_job_id(
self,
job_id: str | None,
@@ -1503,6 +1496,31 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
scope_value = self._get_field("scope", None)
return _get_scopes(scope_value)
+ def get_openlineage_database_info(self, connection) -> DatabaseInfo:
+ """Return BigQuery specific information for OpenLineage."""
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+ return DatabaseInfo(
+ scheme=self.get_openlineage_database_dialect(None),
+ authority=None,
+ database=self.project_id,
+ information_schema_columns=[
+ "table_schema",
+ "table_name",
+ "column_name",
+ "ordinal_position",
+ "data_type",
+ "table_catalog",
+ ],
+ information_schema_table_name="INFORMATION_SCHEMA.COLUMNS",
+ )
+
+ def get_openlineage_database_dialect(self, _) -> str:
+ return "bigquery"
+
+ def get_openlineage_default_schema(self) -> str | None:
+ return None
+
class BigQueryConnection:
"""
diff --git
a/providers/google/src/airflow/providers/google/cloud/utils/lineage.py
b/providers/google/src/airflow/providers/google/cloud/utils/lineage.py
new file mode 100644
index 00000000000..c5725976684
--- /dev/null
+++ b/providers/google/src/airflow/providers/google/cloud/utils/lineage.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+from google.cloud.bigquery import CopyJob, ExtractJob, LoadJob, QueryJob
+
+from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+from airflow.providers.common.sql.hooks.lineage import send_sql_hook_lineage
+
+log = logging.getLogger(__name__)
+
+
+def _add_bq_table_to_lineage(collector, context, table_ref, *, is_input: bool):
+ method = collector.add_input_asset if is_input else
collector.add_output_asset
+ method(
+ context=context,
+ scheme="bigquery",
+ asset_kwargs={
+ "project_id": table_ref.project,
+ "dataset_id": table_ref.dataset_id,
+ "table_id": table_ref.table_id,
+ },
+ )
+
+
+def _add_gcs_uris_to_lineage(collector, context, uris, *, is_input: bool):
+ method = collector.add_input_asset if is_input else
collector.add_output_asset
+ for uri in uris or []:
+ method(context=context, uri=uri)
+
+
+def send_hook_lineage_for_bq_job(context, job):
+ """
+ Send hook-level lineage for a BigQuery job to the lineage collector.
+
+ Handles all four BigQuery job types:
+ - QUERY: delegates to send_sql_hook_lineage for SQL parsing
+ - LOAD: source URIs (GCS) as inputs, destination table as output
+ - COPY: source tables as inputs, destination table as output
+ - EXTRACT: source table as input, destination URIs (GCS) as outputs
+
+ :param context: The hook instance used as lineage context.
+ :param job: A BigQuery job object (QueryJob, LoadJob, CopyJob, or
ExtractJob).
+ """
+ collector = get_hook_lineage_collector()
+
+ if isinstance(job, QueryJob):
+ log.debug("Sending Hook Level Lineage for Query job.")
+ send_sql_hook_lineage(
+ context=context,
+ sql=job.query,
+ job_id=job.job_id,
+ default_db=job.default_dataset.project if job.default_dataset else
None,
+ default_schema=job.default_dataset.dataset_id if
job.default_dataset else None,
+ )
+ return
+
+ try:
+ if isinstance(job, LoadJob):
+ log.debug("Sending Hook Level Lineage for Load job.")
+ _add_gcs_uris_to_lineage(collector, context, job.source_uris,
is_input=True)
+ if job.destination:
+ _add_bq_table_to_lineage(collector, context, job.destination,
is_input=False)
+ elif isinstance(job, CopyJob):
+ log.debug("Sending Hook Level Lineage for Copy job.")
+ for source_table in job.sources or []:
+ _add_bq_table_to_lineage(collector, context, source_table,
is_input=True)
+ if job.destination:
+ _add_bq_table_to_lineage(collector, context, job.destination,
is_input=False)
+ elif isinstance(job, ExtractJob):
+ log.debug("Sending Hook Level Lineage for Extract job.")
+ if job.source:
+ _add_bq_table_to_lineage(collector, context, job.source,
is_input=True)
+ _add_gcs_uris_to_lineage(collector, context, job.destination_uris,
is_input=False)
+ except Exception as e:
+ log.warning("Sending BQ job hook level lineage failed: %s",
f"{e.__class__.__name__}: {str(e)}")
+ log.debug("Exception details:", exc_info=True)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index a2cd8894e4f..bbd4f64bf46 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -2068,23 +2068,13 @@ class TestHookLevelLineage(_BigQueryBaseTestClass):
assert call_kw["sql"] == sql
assert call_kw["sql_parameters"] == parameters
-
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.send_sql_hook_lineage")
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.send_hook_lineage_for_bq_job")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_insert_job_hook_lineage(self, mock_client, mock_query_job,
mock_send_lineage):
- query_job_type = "query"
- job_conf = {
- query_job_type: {
- query_job_type: "SELECT * FROM test",
- "useLegacySql": "False",
- }
- }
- mock_query_job._JOB_TYPE = query_job_type
- mock_query_job.job_type = query_job_type
+ job_conf = {"query": {"query": "SELECT * FROM test", "useLegacySql":
"False"}}
+ mock_query_job._JOB_TYPE = "query"
mock_job_instance = mock.MagicMock()
- mock_job_instance.job_id = JOB_ID
- mock_job_instance.query = "SELECT * FROM test"
- mock_job_instance.job_type = query_job_type
mock_query_job.from_api_repr.return_value = mock_job_instance
self.hook.insert_job(
@@ -2095,8 +2085,4 @@ class TestHookLevelLineage(_BigQueryBaseTestClass):
nowait=True,
)
- mock_send_lineage.assert_called_once()
- call_kw = mock_send_lineage.call_args.kwargs
- assert call_kw["context"] is self.hook
- assert call_kw["sql"] == "SELECT * FROM test"
- assert call_kw["job_id"] == JOB_ID
+ mock_send_lineage.assert_called_once_with(context=self.hook,
job=mock_job_instance)
diff --git a/providers/google/tests/unit/google/cloud/utils/test_lineage.py
b/providers/google/tests/unit/google/cloud/utils/test_lineage.py
new file mode 100644
index 00000000000..fd9dc22640b
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/utils/test_lineage.py
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from google.cloud.bigquery import CopyJob, DatasetReference, ExtractJob,
LoadJob, QueryJob, TableReference
+
+from airflow.providers.common.compat.assets import Asset
+from airflow.providers.google.cloud.utils.lineage import (
+ _add_bq_table_to_lineage,
+ _add_gcs_uris_to_lineage,
+ send_hook_lineage_for_bq_job,
+)
+
+PROJECT_ID = "test-project"
+DATASET_ID = "test_dataset"
+TABLE_ID = "test_table"
+JOB_ID = "test-job-123"
+
+TABLE_REFERENCE = TableReference(DatasetReference(PROJECT_ID, DATASET_ID),
TABLE_ID)
+
+
+def _make_table_ref(project, dataset, table):
+ return TableReference(DatasetReference(project, dataset), table)
+
+
+class TestAddBqTableToLineage:
+ def test_add_as_input(self):
+ collector = mock.MagicMock()
+ context = mock.sentinel.context
+
+ _add_bq_table_to_lineage(collector, context, TABLE_REFERENCE,
is_input=True)
+
+ collector.add_input_asset.assert_called_once_with(
+ context=context,
+ scheme="bigquery",
+ asset_kwargs={
+ "project_id": PROJECT_ID,
+ "dataset_id": DATASET_ID,
+ "table_id": TABLE_ID,
+ },
+ )
+ collector.add_output_asset.assert_not_called()
+
+ def test_add_as_output(self):
+ collector = mock.MagicMock()
+ context = mock.sentinel.context
+
+ _add_bq_table_to_lineage(collector, context, TABLE_REFERENCE,
is_input=False)
+
+ collector.add_output_asset.assert_called_once_with(
+ context=context,
+ scheme="bigquery",
+ asset_kwargs={
+ "project_id": PROJECT_ID,
+ "dataset_id": DATASET_ID,
+ "table_id": TABLE_ID,
+ },
+ )
+ collector.add_input_asset.assert_not_called()
+
+
+class TestAddGcsUrisToLineage:
+ def test_add_uris_as_input(self):
+ collector = mock.MagicMock()
+ context = mock.sentinel.context
+ uris = ["gs://bucket1/path/file.csv", "gs://bucket2/other.json"]
+
+ _add_gcs_uris_to_lineage(collector, context, uris, is_input=True)
+
+ assert collector.add_input_asset.call_count == 2
+ collector.add_input_asset.assert_any_call(context=context,
uri="gs://bucket1/path/file.csv")
+ collector.add_input_asset.assert_any_call(context=context,
uri="gs://bucket2/other.json")
+ collector.add_output_asset.assert_not_called()
+
+ def test_add_uris_as_output(self):
+ collector = mock.MagicMock()
+ context = mock.sentinel.context
+ uris = ["gs://bucket/export/data.csv"]
+
+ _add_gcs_uris_to_lineage(collector, context, uris, is_input=False)
+
+ collector.add_output_asset.assert_called_once_with(context=context,
uri="gs://bucket/export/data.csv")
+ collector.add_input_asset.assert_not_called()
+
+ def test_empty_uris(self):
+ collector = mock.MagicMock()
+ _add_gcs_uris_to_lineage(collector, mock.sentinel.context, [],
is_input=True)
+ collector.add_input_asset.assert_not_called()
+
+ def test_none_uris(self):
+ collector = mock.MagicMock()
+ _add_gcs_uris_to_lineage(collector, mock.sentinel.context, None,
is_input=True)
+ collector.add_input_asset.assert_not_called()
+
+
+class TestSendHookLineageForBqJob:
+
@mock.patch("airflow.providers.google.cloud.utils.lineage.send_sql_hook_lineage")
+ def test_query_job(self, mock_send_sql):
+ job = mock.MagicMock(spec=QueryJob)
+ job.query = "SELECT * FROM dataset.table"
+ job.job_id = JOB_ID
+ job.default_dataset = DatasetReference(PROJECT_ID, DATASET_ID)
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ mock_send_sql.assert_called_once_with(
+ context=context,
+ sql="SELECT * FROM dataset.table",
+ job_id=JOB_ID,
+ default_db=PROJECT_ID,
+ default_schema=DATASET_ID,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.utils.lineage.send_sql_hook_lineage")
+ def test_query_job_no_default_dataset(self, mock_send_sql):
+ job = mock.MagicMock(spec=QueryJob)
+ job.query = "SELECT 1"
+ job.job_id = JOB_ID
+ job.default_dataset = None
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ mock_send_sql.assert_called_once_with(
+ context=context,
+ sql="SELECT 1",
+ job_id=JOB_ID,
+ default_db=None,
+ default_schema=None,
+ )
+
+ def test_load_job(self, hook_lineage_collector):
+ job = mock.MagicMock(spec=LoadJob)
+ job.source_uris = ["gs://bucket/data.csv", "gs://bucket/data2.csv"]
+ job.destination = TABLE_REFERENCE
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ assert len(hook_lineage_collector.collected_assets.inputs) == 2
+ assert len(hook_lineage_collector.collected_assets.outputs) == 1
+ assert hook_lineage_collector.collected_assets.outputs[0].asset ==
Asset(
+ uri=f"bigquery://{PROJECT_ID}/{DATASET_ID}/{TABLE_ID}"
+ )
+
+ def test_load_job_no_destination(self, hook_lineage_collector):
+ job = mock.MagicMock(spec=LoadJob)
+ job.source_uris = ["gs://bucket/data.csv"]
+ job.destination = None
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ assert len(hook_lineage_collector.collected_assets.inputs) == 1
+ assert len(hook_lineage_collector.collected_assets.outputs) == 0
+
+ def test_copy_job(self, hook_lineage_collector):
+ source1 = _make_table_ref(PROJECT_ID, DATASET_ID, "source1")
+ source2 = _make_table_ref(PROJECT_ID, DATASET_ID, "source2")
+ dest = _make_table_ref(PROJECT_ID, DATASET_ID, "dest")
+
+ job = mock.MagicMock(spec=CopyJob)
+ job.sources = [source1, source2]
+ job.destination = dest
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ assert len(hook_lineage_collector.collected_assets.inputs) == 2
+ assert len(hook_lineage_collector.collected_assets.outputs) == 1
+ assert hook_lineage_collector.collected_assets.inputs[0].asset ==
Asset(
+ uri=f"bigquery://{PROJECT_ID}/{DATASET_ID}/source1"
+ )
+ assert hook_lineage_collector.collected_assets.inputs[1].asset ==
Asset(
+ uri=f"bigquery://{PROJECT_ID}/{DATASET_ID}/source2"
+ )
+ assert hook_lineage_collector.collected_assets.outputs[0].asset ==
Asset(
+ uri=f"bigquery://{PROJECT_ID}/{DATASET_ID}/dest"
+ )
+
+ def test_extract_job(self, hook_lineage_collector):
+ job = mock.MagicMock(spec=ExtractJob)
+ job.source = TABLE_REFERENCE
+ job.destination_uris = ["gs://bucket/export/file1.csv",
"gs://bucket/export/file2.csv"]
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ assert len(hook_lineage_collector.collected_assets.inputs) == 1
+ assert len(hook_lineage_collector.collected_assets.outputs) == 2
+ assert hook_lineage_collector.collected_assets.inputs[0].asset ==
Asset(
+ uri=f"bigquery://{PROJECT_ID}/{DATASET_ID}/{TABLE_ID}"
+ )
+
+ def test_extract_job_no_source(self, hook_lineage_collector):
+ job = mock.MagicMock(spec=ExtractJob)
+ job.source = None
+ job.destination_uris = ["gs://bucket/export/file.csv"]
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)
+
+ assert len(hook_lineage_collector.collected_assets.inputs) == 0
+ assert len(hook_lineage_collector.collected_assets.outputs) == 1
+
+
@mock.patch("airflow.providers.google.cloud.utils.lineage.send_sql_hook_lineage")
+ def test_unknown_job_type_does_not_raise(self, mock_send_sql,
hook_lineage_collector):
+ job = mock.MagicMock()
+ send_hook_lineage_for_bq_job(context=mock.sentinel.context, job=job)
+ mock_send_sql.assert_not_called()
+ assert len(hook_lineage_collector.collected_assets.inputs) == 0
+ assert len(hook_lineage_collector.collected_assets.outputs) == 0
+
+ def test_exception_in_non_query_job_is_caught(self,
hook_lineage_collector):
+ job = mock.MagicMock(spec=LoadJob)
+ type(job).source_uris =
mock.PropertyMock(side_effect=RuntimeError("boom"))
+ context = mock.sentinel.context
+
+ send_hook_lineage_for_bq_job(context=context, job=job)