This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b51aaf59d2280e4341807ed39e3a110a26627426 Author: Kacper Muda <mudakac...@gmail.com> AuthorDate: Tue Nov 21 18:14:36 2023 +0100 Add OpenLineage support to GCSToBigQueryOperator (#35778) --- .../google/cloud/transfers/gcs_to_bigquery.py | 77 +++++ .../google/cloud/transfers/test_gcs_to_bigquery.py | 380 ++++++++++++++++++++- 2 files changed, 455 insertions(+), 2 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 94c233d6c5..798bc8a52b 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -41,6 +41,7 @@ from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQuery from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink from airflow.providers.google.cloud.triggers.bigquery import BigQueryInsertJobTrigger +from airflow.utils.helpers import merge_dicts if TYPE_CHECKING: from google.api_core.retry import Retry @@ -294,6 +295,8 @@ class GCSToBigQueryOperator(BaseOperator): self.reattach_states: set[str] = reattach_states or set() self.cancel_on_kill = cancel_on_kill + self.source_uris: list[str] = [] + def _submit_job( self, hook: BigQueryHook, @@ -731,3 +734,77 @@ class GCSToBigQueryOperator(BaseOperator): self.hook.cancel_job(job_id=self.job_id, location=self.location) # type: ignore[union-attr] else: self.log.info("Skipping to cancel job: %s.%s", self.location, self.job_id) + + def get_openlineage_facets_on_complete(self, task_instance): + """Implementing on_complete as we will include final BQ job id.""" + from pathlib import Path + + from openlineage.client.facet import ( + ExternalQueryRunFacet, + SymlinksDatasetFacet, + SymlinksDatasetFacetIdentifiers, + ) + from openlineage.client.run import Dataset + + from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url + from airflow.providers.google.cloud.utils.openlineage import ( + get_facets_from_bq_table, + get_identity_column_lineage_facet, + ) + from airflow.providers.openlineage.extractors import OperatorLineage + + table_object = self.hook.get_client(self.hook.project_id).get_table( + self.destination_project_dataset_table + ) + + output_dataset_facets = get_facets_from_bq_table(table_object) + + input_dataset_facets = { + "schema": output_dataset_facets["schema"], + } + input_datasets = [] + for uri in sorted(self.source_uris): + bucket, blob = _parse_gcs_url(uri) + additional_facets = {} + + if "*" in blob: + # If wildcard ("*") is used in gcs path, we want the name of dataset to be directory name, + # but we create a symlink to the full object path with wildcard. + additional_facets = { + "symlink": SymlinksDatasetFacet( + identifiers=[ + SymlinksDatasetFacetIdentifiers( + namespace=f"gs://{bucket}", name=blob, type="file" + ) + ] + ), + } + blob = Path(blob).parent.as_posix() + if blob == ".": + # blob path does not have leading slash, but we need root dataset name to be "/" + blob = "/" + + dataset = Dataset( + namespace=f"gs://{bucket}", + name=blob, + facets=merge_dicts(input_dataset_facets, additional_facets), + ) + input_datasets.append(dataset) + + output_dataset_facets["columnLineage"] = get_identity_column_lineage_facet( + field_names=[field.name for field in table_object.schema], input_datasets=input_datasets + ) + + output_dataset = Dataset( + namespace="bigquery", + name=str(table_object.reference), + facets=output_dataset_facets, + ) + + run_facets = {} + if self.job_id: + run_facets = { + "externalQuery": ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery"), + } + + return OperatorLineage(inputs=input_datasets, outputs=[output_dataset], run_facets=run_facets) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index a67759ff85..b7087b6785 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -22,8 +22,20 @@ from unittest import mock from unittest.mock import MagicMock, call import pytest -from google.cloud.bigquery import DEFAULT_RETRY +from google.cloud.bigquery import DEFAULT_RETRY, Table from google.cloud.exceptions import Conflict +from openlineage.client.facet import ( + ColumnLineageDatasetFacet, + ColumnLineageDatasetFacetFieldsAdditional, + ColumnLineageDatasetFacetFieldsAdditionalInputFields, + DocumentationDatasetFacet, + ExternalQueryRunFacet, + SchemaDatasetFacet, + SchemaField, + SymlinksDatasetFacet, + SymlinksDatasetFacetIdentifiers, +) +from openlineage.client.run import Dataset from airflow.exceptions import AirflowException, TaskDeferred from airflow.models import DAG @@ -37,6 +49,9 @@ from airflow.utils.types import DagRunType TASK_ID = "test-gcs-to-bq-operator" TEST_EXPLICIT_DEST = "test-project.dataset.table" TEST_BUCKET = "test-bucket" +TEST_FOLDER = "test-folder" +TEST_OBJECT_NO_WILDCARD = "file.extension" +TEST_OBJECT_WILDCARD = "file_*.extension" PROJECT_ID = "test-project" DATASET = "dataset" TABLE = "table" @@ -59,7 +74,21 @@ TEST_SOURCE_OBJECTS = "test/objects/test.csv" TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json" LABELS = {"k1": "v1"} DESCRIPTION = "Test Description" - +TEST_TABLE: Table = Table.from_api_repr( + { + "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "description": DESCRIPTION, + "schema": { + "fields": [ + {"name": "field1", "type": "STRING", "description": "field1 description"}, + {"name": "field2", "type": "INTEGER"}, + ] + }, + } +) +TEST_EMPTY_TABLE: Table = Table.from_api_repr( + {"tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}} +) job_id = "123456" hash_ = "hash" pytest.real_job_id = f"{job_id}_{hash_}" @@ -1209,6 +1238,353 @@ class TestGCSToBigQueryOperator: }, ) + @pytest.mark.parametrize( + ("source_object", "expected_dataset_name"), + ( + (f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}", f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}"), + (TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_NO_WILDCARD), + (f"{TEST_FOLDER}/{TEST_OBJECT_WILDCARD}", TEST_FOLDER), + (f"{TEST_OBJECT_WILDCARD}", "/"), + (f"{TEST_FOLDER}/*", TEST_FOLDER), + ), + ) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_get_openlineage_facets_on_complete_gcs_dataset_name( + self, hook, source_object, expected_dataset_name + ): + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + operator = GCSToBigQueryOperator( + project_id=JOB_PROJECT_ID, + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=[source_object], + destination_project_dataset_table=TEST_EXPLICIT_DEST, + ) + + expected_symlink = SymlinksDatasetFacet( + identifiers=[ + SymlinksDatasetFacetIdentifiers( + namespace=f"gs://{TEST_BUCKET}", + name=source_object, + type="file", + ) + ] + ) + operator.execute(context=mock.MagicMock()) + + lineage = operator.get_openlineage_facets_on_complete(None) + assert len(lineage.inputs) == 1 + assert lineage.inputs[0].name == expected_dataset_name + if "*" in source_object: + assert lineage.inputs[0].facets.get("symlink") + assert lineage.inputs[0].facets.get("symlink") == expected_symlink + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_get_openlineage_facets_on_complete_gcs_multiple_uris(self, hook): + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + operator = GCSToBigQueryOperator( + project_id=JOB_PROJECT_ID, + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=[ + TEST_OBJECT_NO_WILDCARD, + TEST_OBJECT_WILDCARD, + f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}", + f"{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}", + ], + destination_project_dataset_table=TEST_EXPLICIT_DEST, + ) + + operator.execute(context=mock.MagicMock()) + + lineage = operator.get_openlineage_facets_on_complete(None) + assert len(lineage.inputs) == 4 + assert lineage.inputs[0].name == TEST_OBJECT_NO_WILDCARD + assert lineage.inputs[1].name == "/" + assert lineage.inputs[1].facets.get("symlink") == SymlinksDatasetFacet( + identifiers=[ + SymlinksDatasetFacetIdentifiers( + namespace=f"gs://{TEST_BUCKET}", + name=TEST_OBJECT_WILDCARD, + type="file", + ) + ] + ) + assert lineage.inputs[2].name == f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}" + assert lineage.inputs[3].name == f"{TEST_FOLDER}2" + assert lineage.inputs[3].facets.get("symlink") == SymlinksDatasetFacet( + identifiers=[ + SymlinksDatasetFacetIdentifiers( + namespace=f"gs://{TEST_BUCKET}", + name=f"{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}", + type="file", + ) + ] + ) + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_get_openlineage_facets_on_complete_bq_dataset(self, hook): + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + hook.return_value.get_client.return_value.get_table.return_value = TEST_TABLE + + expected_output_dataset_facets = { + "schema": SchemaDatasetFacet( + fields=[ + SchemaField(name="field1", type="STRING", description="field1 description"), + SchemaField(name="field2", type="INTEGER"), + ] + ), + "documentation": DocumentationDatasetFacet(description="Test Description"), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "field1": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, field="field1" + ) + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + "field2": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, field="field2" + ) + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + } + ), + } + + operator = GCSToBigQueryOperator( + project_id=JOB_PROJECT_ID, + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=[TEST_OBJECT_NO_WILDCARD], + destination_project_dataset_table=TEST_EXPLICIT_DEST, + ) + + operator.execute(context=mock.MagicMock()) + + lineage = operator.get_openlineage_facets_on_complete(None) + assert len(lineage.outputs) == 1 + assert lineage.outputs[0] == Dataset( + namespace="bigquery", + name=TEST_EXPLICIT_DEST, + facets=expected_output_dataset_facets, + ) + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_get_openlineage_facets_on_complete_bq_dataset_multiple_gcs_uris(self, hook): + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + hook.return_value.get_client.return_value.get_table.return_value = TEST_TABLE + + expected_output_dataset_facets = { + "schema": SchemaDatasetFacet( + fields=[ + SchemaField(name="field1", type="STRING", description="field1 description"), + SchemaField(name="field2", type="INTEGER"), + ] + ), + "documentation": DocumentationDatasetFacet(description="Test Description"), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "field1": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, field="field1" + ), + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name="/", field="field1" + ), + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + "field2": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, field="field2" + ), + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name="/", field="field2" + ), + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + } + ), + } + + operator = GCSToBigQueryOperator( + project_id=JOB_PROJECT_ID, + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD], + destination_project_dataset_table=TEST_EXPLICIT_DEST, + ) + + operator.execute(context=mock.MagicMock()) + + lineage = operator.get_openlineage_facets_on_complete(None) + assert len(lineage.outputs) == 1 + assert lineage.outputs[0] == Dataset( + namespace="bigquery", + name=TEST_EXPLICIT_DEST, + facets=expected_output_dataset_facets, + ) + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_get_openlineage_facets_on_complete_empty_table(self, hook): + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + hook.return_value.get_client.return_value.get_table.return_value = TEST_EMPTY_TABLE + + expected_output_dataset_facets = { + "schema": SchemaDatasetFacet(fields=[]), + "documentation": DocumentationDatasetFacet(description=""), + "columnLineage": ColumnLineageDatasetFacet(fields={}), + } + + operator = GCSToBigQueryOperator( + project_id=JOB_PROJECT_ID, + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD], + destination_project_dataset_table=TEST_EXPLICIT_DEST, + ) + + operator.execute(context=mock.MagicMock()) + + lineage = operator.get_openlineage_facets_on_complete(None) + assert len(lineage.inputs) == 2 + assert len(lineage.outputs) == 1 + assert lineage.outputs[0] == Dataset( + namespace="bigquery", + name=TEST_EXPLICIT_DEST, + facets=expected_output_dataset_facets, + ) + assert lineage.inputs[0] == Dataset( + namespace=f"gs://{TEST_BUCKET}", + name=TEST_OBJECT_NO_WILDCARD, + facets={"schema": SchemaDatasetFacet(fields=[])}, + ) + assert lineage.inputs[1] == Dataset( + namespace=f"gs://{TEST_BUCKET}", + name="/", + facets={ + "schema": SchemaDatasetFacet(fields=[]), + "symlink": SymlinksDatasetFacet( + identifiers=[ + SymlinksDatasetFacetIdentifiers( + namespace=f"gs://{TEST_BUCKET}", + name=TEST_OBJECT_WILDCARD, + type="file", + ) + ] + ), + }, + ) + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_get_openlineage_facets_on_complete_full_table_multiple_gcs_uris(self, hook): + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + hook.return_value.get_client.return_value.get_table.return_value = TEST_TABLE + hook.return_value.generate_job_id.return_value = pytest.real_job_id + + schema_facet = SchemaDatasetFacet( + fields=[ + SchemaField(name="field1", type="STRING", description="field1 description"), + SchemaField(name="field2", type="INTEGER"), + ] + ) + + expected_input_wildcard_dataset_facets = { + "schema": schema_facet, + "symlink": SymlinksDatasetFacet( + identifiers=[ + SymlinksDatasetFacetIdentifiers( + namespace=f"gs://{TEST_BUCKET}", + name=TEST_OBJECT_WILDCARD, + type="file", + ) + ] + ), + } + expected_input_no_wildcard_dataset_facets = {"schema": schema_facet} + + expected_output_dataset_facets = { + "schema": schema_facet, + "documentation": DocumentationDatasetFacet(description="Test Description"), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "field1": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, field="field1" + ), + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name="/", field="field1" + ), + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + "field2": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, field="field2" + ), + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace=f"gs://{TEST_BUCKET}", name="/", field="field2" + ), + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + } + ), + } + + operator = GCSToBigQueryOperator( + project_id=JOB_PROJECT_ID, + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=[TEST_OBJECT_NO_WILDCARD, TEST_OBJECT_WILDCARD], + destination_project_dataset_table=TEST_EXPLICIT_DEST, + ) + + operator.execute(context=mock.MagicMock()) + + lineage = operator.get_openlineage_facets_on_complete(None) + assert len(lineage.inputs) == 2 + assert len(lineage.outputs) == 1 + assert lineage.outputs[0] == Dataset( + namespace="bigquery", name=TEST_EXPLICIT_DEST, facets=expected_output_dataset_facets + ) + + assert lineage.inputs[0] == Dataset( + namespace=f"gs://{TEST_BUCKET}", + name=TEST_OBJECT_NO_WILDCARD, + facets=expected_input_no_wildcard_dataset_facets, + ) + assert lineage.inputs[1] == Dataset( + namespace=f"gs://{TEST_BUCKET}", name="/", facets=expected_input_wildcard_dataset_facets + ) + assert lineage.run_facets == { + "externalQuery": ExternalQueryRunFacet(externalQueryId=pytest.real_job_id, source="bigquery") + } + assert lineage.job_facets == {} + class TestAsyncGCSToBigQueryOperator: @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))