This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b51aaf59d2280e4341807ed39e3a110a26627426
Author: Kacper Muda <mudakac...@gmail.com>
AuthorDate: Tue Nov 21 18:14:36 2023 +0100

    Add OpenLineage support to GCSToBigQueryOperator (#35778)
---
 .../google/cloud/transfers/gcs_to_bigquery.py      |  77 +++++
 .../google/cloud/transfers/test_gcs_to_bigquery.py | 380 ++++++++++++++++++++-
 2 files changed, 455 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py 
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 94c233d6c5..798bc8a52b 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -41,6 +41,7 @@ from airflow.providers.google.cloud.hooks.bigquery import 
BigQueryHook, BigQuery
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
 from airflow.providers.google.cloud.triggers.bigquery import 
BigQueryInsertJobTrigger
+from airflow.utils.helpers import merge_dicts
 
 if TYPE_CHECKING:
     from google.api_core.retry import Retry
@@ -294,6 +295,8 @@ class GCSToBigQueryOperator(BaseOperator):
         self.reattach_states: set[str] = reattach_states or set()
         self.cancel_on_kill = cancel_on_kill
 
+        self.source_uris: list[str] = []
+
     def _submit_job(
         self,
         hook: BigQueryHook,
@@ -731,3 +734,77 @@ class GCSToBigQueryOperator(BaseOperator):
             self.hook.cancel_job(job_id=self.job_id, location=self.location)  
# type: ignore[union-attr]
         else:
             self.log.info("Skipping to cancel job: %s.%s", self.location, 
self.job_id)
+
+    def get_openlineage_facets_on_complete(self, task_instance):
+        """Implementing on_complete as we will include final BQ job id."""
+        from pathlib import Path
+
+        from openlineage.client.facet import (
+            ExternalQueryRunFacet,
+            SymlinksDatasetFacet,
+            SymlinksDatasetFacetIdentifiers,
+        )
+        from openlineage.client.run import Dataset
+
+        from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+        from airflow.providers.google.cloud.utils.openlineage import (
+            get_facets_from_bq_table,
+            get_identity_column_lineage_facet,
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        table_object = self.hook.get_client(self.hook.project_id).get_table(
+            self.destination_project_dataset_table
+        )
+
+        output_dataset_facets = get_facets_from_bq_table(table_object)
+
+        input_dataset_facets = {
+            "schema": output_dataset_facets["schema"],
+        }
+        input_datasets = []
+        for uri in sorted(self.source_uris):
+            bucket, blob = _parse_gcs_url(uri)
+            additional_facets = {}
+
+            if "*" in blob:
+                # If wildcard ("*") is used in gcs path, we want the name of 
dataset to be directory name,
+                # but we create a symlink to the full object path with 
wildcard.
+                additional_facets = {
+                    "symlink": SymlinksDatasetFacet(
+                        identifiers=[
+                            SymlinksDatasetFacetIdentifiers(
+                                namespace=f"gs://{bucket}", name=blob, 
type="file"
+                            )
+                        ]
+                    ),
+                }
+                blob = Path(blob).parent.as_posix()
+                if blob == ".":
+                    # blob path does not have leading slash, but we need root 
dataset name to be "/"
+                    blob = "/"
+
+            dataset = Dataset(
+                namespace=f"gs://{bucket}",
+                name=blob,
+                facets=merge_dicts(input_dataset_facets, additional_facets),
+            )
+            input_datasets.append(dataset)
+
+        output_dataset_facets["columnLineage"] = 
get_identity_column_lineage_facet(
+            field_names=[field.name for field in table_object.schema], 
input_datasets=input_datasets
+        )
+
+        output_dataset = Dataset(
+            namespace="bigquery",
+            name=str(table_object.reference),
+            facets=output_dataset_facets,
+        )
+
+        run_facets = {}
+        if self.job_id:
+            run_facets = {
+                "externalQuery": 
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery"),
+            }
+
+        return OperatorLineage(inputs=input_datasets, 
outputs=[output_dataset], run_facets=run_facets)
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
index a67759ff85..b7087b6785 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -22,8 +22,20 @@ from unittest import mock
 from unittest.mock import MagicMock, call
 
 import pytest
-from google.cloud.bigquery import DEFAULT_RETRY
+from google.cloud.bigquery import DEFAULT_RETRY, Table
 from google.cloud.exceptions import Conflict
+from openlineage.client.facet import (
+    ColumnLineageDatasetFacet,
+    ColumnLineageDatasetFacetFieldsAdditional,
+    ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+    DocumentationDatasetFacet,
+    ExternalQueryRunFacet,
+    SchemaDatasetFacet,
+    SchemaField,
+    SymlinksDatasetFacet,
+    SymlinksDatasetFacetIdentifiers,
+)
+from openlineage.client.run import Dataset
 
 from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.models import DAG
@@ -37,6 +49,9 @@ from airflow.utils.types import DagRunType
 TASK_ID = "test-gcs-to-bq-operator"
 TEST_EXPLICIT_DEST = "test-project.dataset.table"
 TEST_BUCKET = "test-bucket"
+TEST_FOLDER = "test-folder"
+TEST_OBJECT_NO_WILDCARD = "file.extension"
+TEST_OBJECT_WILDCARD = "file_*.extension"
 PROJECT_ID = "test-project"
 DATASET = "dataset"
 TABLE = "table"
@@ -59,7 +74,21 @@ TEST_SOURCE_OBJECTS = "test/objects/test.csv"
 TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json"
 LABELS = {"k1": "v1"}
 DESCRIPTION = "Test Description"
-
+TEST_TABLE: Table = Table.from_api_repr(
+    {
+        "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, 
"tableId": TABLE},
+        "description": DESCRIPTION,
+        "schema": {
+            "fields": [
+                {"name": "field1", "type": "STRING", "description": "field1 
description"},
+                {"name": "field2", "type": "INTEGER"},
+            ]
+        },
+    }
+)
+TEST_EMPTY_TABLE: Table = Table.from_api_repr(
+    {"tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, 
"tableId": TABLE}}
+)
 job_id = "123456"
 hash_ = "hash"
 pytest.real_job_id = f"{job_id}_{hash_}"
@@ -1209,6 +1238,353 @@ class TestGCSToBigQueryOperator:
             },
         )
 
+    @pytest.mark.parametrize(
+        ("source_object", "expected_dataset_name"),
+        (
+            (f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}", 
f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}"),
+            (TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_NO_WILDCARD),
+            (f"{TEST_FOLDER}/{TEST_OBJECT_WILDCARD}", TEST_FOLDER),
+            (f"{TEST_OBJECT_WILDCARD}", "/"),
+            (f"{TEST_FOLDER}/*", TEST_FOLDER),
+        ),
+    )
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_get_openlineage_facets_on_complete_gcs_dataset_name(
+        self, hook, source_object, expected_dataset_name
+    ):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        operator = GCSToBigQueryOperator(
+            project_id=JOB_PROJECT_ID,
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=[source_object],
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+        )
+
+        expected_symlink = SymlinksDatasetFacet(
+            identifiers=[
+                SymlinksDatasetFacetIdentifiers(
+                    namespace=f"gs://{TEST_BUCKET}",
+                    name=source_object,
+                    type="file",
+                )
+            ]
+        )
+        operator.execute(context=mock.MagicMock())
+
+        lineage = operator.get_openlineage_facets_on_complete(None)
+        assert len(lineage.inputs) == 1
+        assert lineage.inputs[0].name == expected_dataset_name
+        if "*" in source_object:
+            assert lineage.inputs[0].facets.get("symlink")
+            assert lineage.inputs[0].facets.get("symlink") == expected_symlink
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_get_openlineage_facets_on_complete_gcs_multiple_uris(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        operator = GCSToBigQueryOperator(
+            project_id=JOB_PROJECT_ID,
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=[
+                TEST_OBJECT_NO_WILDCARD,
+                TEST_OBJECT_WILDCARD,
+                f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}",
+                f"{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}",
+            ],
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+        )
+
+        operator.execute(context=mock.MagicMock())
+
+        lineage = operator.get_openlineage_facets_on_complete(None)
+        assert len(lineage.inputs) == 4
+        assert lineage.inputs[0].name == TEST_OBJECT_NO_WILDCARD
+        assert lineage.inputs[1].name == "/"
+        assert lineage.inputs[1].facets.get("symlink") == SymlinksDatasetFacet(
+            identifiers=[
+                SymlinksDatasetFacetIdentifiers(
+                    namespace=f"gs://{TEST_BUCKET}",
+                    name=TEST_OBJECT_WILDCARD,
+                    type="file",
+                )
+            ]
+        )
+        assert lineage.inputs[2].name == 
f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}"
+        assert lineage.inputs[3].name == f"{TEST_FOLDER}2"
+        assert lineage.inputs[3].facets.get("symlink") == SymlinksDatasetFacet(
+            identifiers=[
+                SymlinksDatasetFacetIdentifiers(
+                    namespace=f"gs://{TEST_BUCKET}",
+                    name=f"{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}",
+                    type="file",
+                )
+            ]
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_get_openlineage_facets_on_complete_bq_dataset(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_client.return_value.get_table.return_value = 
TEST_TABLE
+
+        expected_output_dataset_facets = {
+            "schema": SchemaDatasetFacet(
+                fields=[
+                    SchemaField(name="field1", type="STRING", 
description="field1 description"),
+                    SchemaField(name="field2", type="INTEGER"),
+                ]
+            ),
+            "documentation": DocumentationDatasetFacet(description="Test 
Description"),
+            "columnLineage": ColumnLineageDatasetFacet(
+                fields={
+                    "field1": ColumnLineageDatasetFacetFieldsAdditional(
+                        inputFields=[
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", 
name=TEST_OBJECT_NO_WILDCARD, field="field1"
+                            )
+                        ],
+                        transformationType="IDENTITY",
+                        transformationDescription="identical",
+                    ),
+                    "field2": ColumnLineageDatasetFacetFieldsAdditional(
+                        inputFields=[
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", 
name=TEST_OBJECT_NO_WILDCARD, field="field2"
+                            )
+                        ],
+                        transformationType="IDENTITY",
+                        transformationDescription="identical",
+                    ),
+                }
+            ),
+        }
+
+        operator = GCSToBigQueryOperator(
+            project_id=JOB_PROJECT_ID,
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=[TEST_OBJECT_NO_WILDCARD],
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+        )
+
+        operator.execute(context=mock.MagicMock())
+
+        lineage = operator.get_openlineage_facets_on_complete(None)
+        assert len(lineage.outputs) == 1
+        assert lineage.outputs[0] == Dataset(
+            namespace="bigquery",
+            name=TEST_EXPLICIT_DEST,
+            facets=expected_output_dataset_facets,
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_get_openlineage_facets_on_complete_bq_dataset_multiple_gcs_uris(self, 
hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_client.return_value.get_table.return_value = 
TEST_TABLE
+
+        expected_output_dataset_facets = {
+            "schema": SchemaDatasetFacet(
+                fields=[
+                    SchemaField(name="field1", type="STRING", 
description="field1 description"),
+                    SchemaField(name="field2", type="INTEGER"),
+                ]
+            ),
+            "documentation": DocumentationDatasetFacet(description="Test 
Description"),
+            "columnLineage": ColumnLineageDatasetFacet(
+                fields={
+                    "field1": ColumnLineageDatasetFacetFieldsAdditional(
+                        inputFields=[
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", 
name=TEST_OBJECT_NO_WILDCARD, field="field1"
+                            ),
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", name="/", 
field="field1"
+                            ),
+                        ],
+                        transformationType="IDENTITY",
+                        transformationDescription="identical",
+                    ),
+                    "field2": ColumnLineageDatasetFacetFieldsAdditional(
+                        inputFields=[
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", 
name=TEST_OBJECT_NO_WILDCARD, field="field2"
+                            ),
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", name="/", 
field="field2"
+                            ),
+                        ],
+                        transformationType="IDENTITY",
+                        transformationDescription="identical",
+                    ),
+                }
+            ),
+        }
+
+        operator = GCSToBigQueryOperator(
+            project_id=JOB_PROJECT_ID,
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD],
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+        )
+
+        operator.execute(context=mock.MagicMock())
+
+        lineage = operator.get_openlineage_facets_on_complete(None)
+        assert len(lineage.outputs) == 1
+        assert lineage.outputs[0] == Dataset(
+            namespace="bigquery",
+            name=TEST_EXPLICIT_DEST,
+            facets=expected_output_dataset_facets,
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_get_openlineage_facets_on_complete_empty_table(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_client.return_value.get_table.return_value = 
TEST_EMPTY_TABLE
+
+        expected_output_dataset_facets = {
+            "schema": SchemaDatasetFacet(fields=[]),
+            "documentation": DocumentationDatasetFacet(description=""),
+            "columnLineage": ColumnLineageDatasetFacet(fields={}),
+        }
+
+        operator = GCSToBigQueryOperator(
+            project_id=JOB_PROJECT_ID,
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD],
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+        )
+
+        operator.execute(context=mock.MagicMock())
+
+        lineage = operator.get_openlineage_facets_on_complete(None)
+        assert len(lineage.inputs) == 2
+        assert len(lineage.outputs) == 1
+        assert lineage.outputs[0] == Dataset(
+            namespace="bigquery",
+            name=TEST_EXPLICIT_DEST,
+            facets=expected_output_dataset_facets,
+        )
+        assert lineage.inputs[0] == Dataset(
+            namespace=f"gs://{TEST_BUCKET}",
+            name=TEST_OBJECT_NO_WILDCARD,
+            facets={"schema": SchemaDatasetFacet(fields=[])},
+        )
+        assert lineage.inputs[1] == Dataset(
+            namespace=f"gs://{TEST_BUCKET}",
+            name="/",
+            facets={
+                "schema": SchemaDatasetFacet(fields=[]),
+                "symlink": SymlinksDatasetFacet(
+                    identifiers=[
+                        SymlinksDatasetFacetIdentifiers(
+                            namespace=f"gs://{TEST_BUCKET}",
+                            name=TEST_OBJECT_WILDCARD,
+                            type="file",
+                        )
+                    ]
+                ),
+            },
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_get_openlineage_facets_on_complete_full_table_multiple_gcs_uris(self, 
hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_client.return_value.get_table.return_value = 
TEST_TABLE
+        hook.return_value.generate_job_id.return_value = pytest.real_job_id
+
+        schema_facet = SchemaDatasetFacet(
+            fields=[
+                SchemaField(name="field1", type="STRING", description="field1 
description"),
+                SchemaField(name="field2", type="INTEGER"),
+            ]
+        )
+
+        expected_input_wildcard_dataset_facets = {
+            "schema": schema_facet,
+            "symlink": SymlinksDatasetFacet(
+                identifiers=[
+                    SymlinksDatasetFacetIdentifiers(
+                        namespace=f"gs://{TEST_BUCKET}",
+                        name=TEST_OBJECT_WILDCARD,
+                        type="file",
+                    )
+                ]
+            ),
+        }
+        expected_input_no_wildcard_dataset_facets = {"schema": schema_facet}
+
+        expected_output_dataset_facets = {
+            "schema": schema_facet,
+            "documentation": DocumentationDatasetFacet(description="Test 
Description"),
+            "columnLineage": ColumnLineageDatasetFacet(
+                fields={
+                    "field1": ColumnLineageDatasetFacetFieldsAdditional(
+                        inputFields=[
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", 
name=TEST_OBJECT_NO_WILDCARD, field="field1"
+                            ),
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", name="/", 
field="field1"
+                            ),
+                        ],
+                        transformationType="IDENTITY",
+                        transformationDescription="identical",
+                    ),
+                    "field2": ColumnLineageDatasetFacetFieldsAdditional(
+                        inputFields=[
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", 
name=TEST_OBJECT_NO_WILDCARD, field="field2"
+                            ),
+                            
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                namespace=f"gs://{TEST_BUCKET}", name="/", 
field="field2"
+                            ),
+                        ],
+                        transformationType="IDENTITY",
+                        transformationDescription="identical",
+                    ),
+                }
+            ),
+        }
+
+        operator = GCSToBigQueryOperator(
+            project_id=JOB_PROJECT_ID,
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD],
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+        )
+
+        operator.execute(context=mock.MagicMock())
+
+        lineage = operator.get_openlineage_facets_on_complete(None)
+        assert len(lineage.inputs) == 2
+        assert len(lineage.outputs) == 1
+        assert lineage.outputs[0] == Dataset(
+            namespace="bigquery", name=TEST_EXPLICIT_DEST, 
facets=expected_output_dataset_facets
+        )
+
+        assert lineage.inputs[0] == Dataset(
+            namespace=f"gs://{TEST_BUCKET}",
+            name=TEST_OBJECT_NO_WILDCARD,
+            facets=expected_input_no_wildcard_dataset_facets,
+        )
+        assert lineage.inputs[1] == Dataset(
+            namespace=f"gs://{TEST_BUCKET}", name="/", 
facets=expected_input_wildcard_dataset_facets
+        )
+        assert lineage.run_facets == {
+            "externalQuery": 
ExternalQueryRunFacet(externalQueryId=pytest.real_job_id, source="bigquery")
+        }
+        assert lineage.job_facets == {}
+
 
 class TestAsyncGCSToBigQueryOperator:
     @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))

Reply via email to