This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1fae1a50e9 Add OpenLineage support to GCSToBigQueryOperator (#35778)
1fae1a50e9 is described below
commit 1fae1a50e97fae9e414d062acb4f1e641523fa5a
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Nov 21 18:14:36 2023 +0100
Add OpenLineage support to GCSToBigQueryOperator (#35778)
---
.../google/cloud/transfers/gcs_to_bigquery.py | 77 +++++
.../google/cloud/transfers/test_gcs_to_bigquery.py | 380 ++++++++++++++++++++-
2 files changed, 455 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 94c233d6c5..798bc8a52b 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -41,6 +41,7 @@ from airflow.providers.google.cloud.hooks.bigquery import
BigQueryHook, BigQuery
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
from airflow.providers.google.cloud.triggers.bigquery import
BigQueryInsertJobTrigger
+from airflow.utils.helpers import merge_dicts
if TYPE_CHECKING:
from google.api_core.retry import Retry
@@ -294,6 +295,8 @@ class GCSToBigQueryOperator(BaseOperator):
self.reattach_states: set[str] = reattach_states or set()
self.cancel_on_kill = cancel_on_kill
+ self.source_uris: list[str] = []
+
def _submit_job(
self,
hook: BigQueryHook,
@@ -731,3 +734,77 @@ class GCSToBigQueryOperator(BaseOperator):
self.hook.cancel_job(job_id=self.job_id, location=self.location)
# type: ignore[union-attr]
else:
self.log.info("Skipping to cancel job: %s.%s", self.location,
self.job_id)
+
+ def get_openlineage_facets_on_complete(self, task_instance):
+ """Implementing on_complete as we will include final BQ job id."""
+ from pathlib import Path
+
+ from openlineage.client.facet import (
+ ExternalQueryRunFacet,
+ SymlinksDatasetFacet,
+ SymlinksDatasetFacetIdentifiers,
+ )
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+ from airflow.providers.google.cloud.utils.openlineage import (
+ get_facets_from_bq_table,
+ get_identity_column_lineage_facet,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ table_object = self.hook.get_client(self.hook.project_id).get_table(
+ self.destination_project_dataset_table
+ )
+
+ output_dataset_facets = get_facets_from_bq_table(table_object)
+
+ input_dataset_facets = {
+ "schema": output_dataset_facets["schema"],
+ }
+ input_datasets = []
+ for uri in sorted(self.source_uris):
+ bucket, blob = _parse_gcs_url(uri)
+ additional_facets = {}
+
+ if "*" in blob:
+ # If wildcard ("*") is used in gcs path, we want the name of
dataset to be directory name,
+ # but we create a symlink to the full object path with
wildcard.
+ additional_facets = {
+ "symlink": SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{bucket}", name=blob,
type="file"
+ )
+ ]
+ ),
+ }
+ blob = Path(blob).parent.as_posix()
+ if blob == ".":
+ # blob path does not have leading slash, but we need root
dataset name to be "/"
+ blob = "/"
+
+ dataset = Dataset(
+ namespace=f"gs://{bucket}",
+ name=blob,
+ facets=merge_dicts(input_dataset_facets, additional_facets),
+ )
+ input_datasets.append(dataset)
+
+ output_dataset_facets["columnLineage"] =
get_identity_column_lineage_facet(
+ field_names=[field.name for field in table_object.schema],
input_datasets=input_datasets
+ )
+
+ output_dataset = Dataset(
+ namespace="bigquery",
+ name=str(table_object.reference),
+ facets=output_dataset_facets,
+ )
+
+ run_facets = {}
+ if self.job_id:
+ run_facets = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery"),
+ }
+
+ return OperatorLineage(inputs=input_datasets,
outputs=[output_dataset], run_facets=run_facets)
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
index a67759ff85..b7087b6785 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -22,8 +22,20 @@ from unittest import mock
from unittest.mock import MagicMock, call
import pytest
-from google.cloud.bigquery import DEFAULT_RETRY
+from google.cloud.bigquery import DEFAULT_RETRY, Table
from google.cloud.exceptions import Conflict
+from openlineage.client.facet import (
+ ColumnLineageDatasetFacet,
+ ColumnLineageDatasetFacetFieldsAdditional,
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+ DocumentationDatasetFacet,
+ ExternalQueryRunFacet,
+ SchemaDatasetFacet,
+ SchemaField,
+ SymlinksDatasetFacet,
+ SymlinksDatasetFacetIdentifiers,
+)
+from openlineage.client.run import Dataset
from airflow.exceptions import AirflowException, TaskDeferred
from airflow.models import DAG
@@ -37,6 +49,9 @@ from airflow.utils.types import DagRunType
TASK_ID = "test-gcs-to-bq-operator"
TEST_EXPLICIT_DEST = "test-project.dataset.table"
TEST_BUCKET = "test-bucket"
+TEST_FOLDER = "test-folder"
+TEST_OBJECT_NO_WILDCARD = "file.extension"
+TEST_OBJECT_WILDCARD = "file_*.extension"
PROJECT_ID = "test-project"
DATASET = "dataset"
TABLE = "table"
@@ -59,7 +74,21 @@ TEST_SOURCE_OBJECTS = "test/objects/test.csv"
TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json"
LABELS = {"k1": "v1"}
DESCRIPTION = "Test Description"
-
+TEST_TABLE: Table = Table.from_api_repr(
+ {
+ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET,
"tableId": TABLE},
+ "description": DESCRIPTION,
+ "schema": {
+ "fields": [
+ {"name": "field1", "type": "STRING", "description": "field1
description"},
+ {"name": "field2", "type": "INTEGER"},
+ ]
+ },
+ }
+)
+TEST_EMPTY_TABLE: Table = Table.from_api_repr(
+ {"tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET,
"tableId": TABLE}}
+)
job_id = "123456"
hash_ = "hash"
pytest.real_job_id = f"{job_id}_{hash_}"
@@ -1209,6 +1238,353 @@ class TestGCSToBigQueryOperator:
},
)
+ @pytest.mark.parametrize(
+ ("source_object", "expected_dataset_name"),
+ (
+ (f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}",
f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}"),
+ (TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_NO_WILDCARD),
+ (f"{TEST_FOLDER}/{TEST_OBJECT_WILDCARD}", TEST_FOLDER),
+ (f"{TEST_OBJECT_WILDCARD}", "/"),
+ (f"{TEST_FOLDER}/*", TEST_FOLDER),
+ ),
+ )
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_get_openlineage_facets_on_complete_gcs_dataset_name(
+ self, hook, source_object, expected_dataset_name
+ ):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ operator = GCSToBigQueryOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=[source_object],
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ )
+
+ expected_symlink = SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=source_object,
+ type="file",
+ )
+ ]
+ )
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0].name == expected_dataset_name
+ if "*" in source_object:
+ assert lineage.inputs[0].facets.get("symlink")
+ assert lineage.inputs[0].facets.get("symlink") == expected_symlink
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_get_openlineage_facets_on_complete_gcs_multiple_uris(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ operator = GCSToBigQueryOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=[
+ TEST_OBJECT_NO_WILDCARD,
+ TEST_OBJECT_WILDCARD,
+ f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}",
+ f"{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}",
+ ],
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 4
+ assert lineage.inputs[0].name == TEST_OBJECT_NO_WILDCARD
+ assert lineage.inputs[1].name == "/"
+ assert lineage.inputs[1].facets.get("symlink") == SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=TEST_OBJECT_WILDCARD,
+ type="file",
+ )
+ ]
+ )
+ assert lineage.inputs[2].name ==
f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}"
+ assert lineage.inputs[3].name == f"{TEST_FOLDER}2"
+ assert lineage.inputs[3].facets.get("symlink") == SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=f"{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}",
+ type="file",
+ )
+ ]
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_get_openlineage_facets_on_complete_bq_dataset(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_client.return_value.get_table.return_value =
TEST_TABLE
+
+ expected_output_dataset_facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="field1", type="STRING",
description="field1 description"),
+ SchemaField(name="field2", type="INTEGER"),
+ ]
+ ),
+ "documentation": DocumentationDatasetFacet(description="Test
Description"),
+ "columnLineage": ColumnLineageDatasetFacet(
+ fields={
+ "field1": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}",
name=TEST_OBJECT_NO_WILDCARD, field="field1"
+ )
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ "field2": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}",
name=TEST_OBJECT_NO_WILDCARD, field="field2"
+ )
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ }
+ ),
+ }
+
+ operator = GCSToBigQueryOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=[TEST_OBJECT_NO_WILDCARD],
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0] == Dataset(
+ namespace="bigquery",
+ name=TEST_EXPLICIT_DEST,
+ facets=expected_output_dataset_facets,
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_get_openlineage_facets_on_complete_bq_dataset_multiple_gcs_uris(self,
hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_client.return_value.get_table.return_value =
TEST_TABLE
+
+ expected_output_dataset_facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="field1", type="STRING",
description="field1 description"),
+ SchemaField(name="field2", type="INTEGER"),
+ ]
+ ),
+ "documentation": DocumentationDatasetFacet(description="Test
Description"),
+ "columnLineage": ColumnLineageDatasetFacet(
+ fields={
+ "field1": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}",
name=TEST_OBJECT_NO_WILDCARD, field="field1"
+ ),
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}", name="/",
field="field1"
+ ),
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ "field2": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}",
name=TEST_OBJECT_NO_WILDCARD, field="field2"
+ ),
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}", name="/",
field="field2"
+ ),
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ }
+ ),
+ }
+
+ operator = GCSToBigQueryOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD],
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0] == Dataset(
+ namespace="bigquery",
+ name=TEST_EXPLICIT_DEST,
+ facets=expected_output_dataset_facets,
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_get_openlineage_facets_on_complete_empty_table(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_client.return_value.get_table.return_value =
TEST_EMPTY_TABLE
+
+ expected_output_dataset_facets = {
+ "schema": SchemaDatasetFacet(fields=[]),
+ "documentation": DocumentationDatasetFacet(description=""),
+ "columnLineage": ColumnLineageDatasetFacet(fields={}),
+ }
+
+ operator = GCSToBigQueryOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD],
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 2
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0] == Dataset(
+ namespace="bigquery",
+ name=TEST_EXPLICIT_DEST,
+ facets=expected_output_dataset_facets,
+ )
+ assert lineage.inputs[0] == Dataset(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=TEST_OBJECT_NO_WILDCARD,
+ facets={"schema": SchemaDatasetFacet(fields=[])},
+ )
+ assert lineage.inputs[1] == Dataset(
+ namespace=f"gs://{TEST_BUCKET}",
+ name="/",
+ facets={
+ "schema": SchemaDatasetFacet(fields=[]),
+ "symlink": SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=TEST_OBJECT_WILDCARD,
+ type="file",
+ )
+ ]
+ ),
+ },
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_get_openlineage_facets_on_complete_full_table_multiple_gcs_uris(self,
hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_client.return_value.get_table.return_value =
TEST_TABLE
+ hook.return_value.generate_job_id.return_value = pytest.real_job_id
+
+ schema_facet = SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="field1", type="STRING", description="field1
description"),
+ SchemaField(name="field2", type="INTEGER"),
+ ]
+ )
+
+ expected_input_wildcard_dataset_facets = {
+ "schema": schema_facet,
+ "symlink": SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=TEST_OBJECT_WILDCARD,
+ type="file",
+ )
+ ]
+ ),
+ }
+ expected_input_no_wildcard_dataset_facets = {"schema": schema_facet}
+
+ expected_output_dataset_facets = {
+ "schema": schema_facet,
+ "documentation": DocumentationDatasetFacet(description="Test
Description"),
+ "columnLineage": ColumnLineageDatasetFacet(
+ fields={
+ "field1": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}",
name=TEST_OBJECT_NO_WILDCARD, field="field1"
+ ),
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}", name="/",
field="field1"
+ ),
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ "field2": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}",
name=TEST_OBJECT_NO_WILDCARD, field="field2"
+ ),
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=f"gs://{TEST_BUCKET}", name="/",
field="field2"
+ ),
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ }
+ ),
+ }
+
+ operator = GCSToBigQueryOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD],
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 2
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0] == Dataset(
+ namespace="bigquery", name=TEST_EXPLICIT_DEST,
facets=expected_output_dataset_facets
+ )
+
+ assert lineage.inputs[0] == Dataset(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=TEST_OBJECT_NO_WILDCARD,
+ facets=expected_input_no_wildcard_dataset_facets,
+ )
+ assert lineage.inputs[1] == Dataset(
+ namespace=f"gs://{TEST_BUCKET}", name="/",
facets=expected_input_wildcard_dataset_facets
+ )
+ assert lineage.run_facets == {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=pytest.real_job_id, source="bigquery")
+ }
+ assert lineage.job_facets == {}
+
class TestAsyncGCSToBigQueryOperator:
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))