mobuchowski commented on code in PR #45801:
URL: https://github.com/apache/airflow/pull/45801#discussion_r1924105090
##########
providers/src/airflow/providers/google/cloud/operators/bigquery_dts.py:
##########
@@ -412,4 +414,117 @@ def execute_completed(self, context: Context, event:
dict):
event["message"],
)
- return TransferRun.to_dict(transfer_run)
+ # Save as attribute for further use by OpenLineage
+ self._transfer_run = TransferRun.to_dict(transfer_run)
+ return self._transfer_run
+
+ def get_openlineage_facets_on_complete(self, _):
+ """Implement _on_complete as we need a run config to extract
information."""
+ from urllib.parse import urlsplit
+
+ from airflow.providers.common.compat.openlineage.facet import Dataset,
ErrorMessageRunFacet
+ from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+ from airflow.providers.google.cloud.openlineage.utils import (
+ BIGQUERY_NAMESPACE,
+ extract_ds_name_from_gcs_path,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo,
SQLParser
+
+ if not self._transfer_run:
+ self.log.debug("No BigQuery Data Transfer configuration was found
by OpenLineage.")
+ return OperatorLineage()
+
+ data_source_id = self._transfer_run["data_source_id"]
+ dest_dataset_id = self._transfer_run["destination_dataset_id"]
+ params = self._transfer_run["params"]
+
+ input_datasets, output_datasets = [], []
+ run_facets, job_facets = {}, {}
+ if data_source_id in ("google_cloud_storage", "amazon_s3",
"azure_blob_storage"):
+ if data_source_id == "google_cloud_storage":
+ bucket, path = _parse_gcs_url(params["data_path_template"]) #
gs://bucket...
+ namespace = f"gs://{bucket}"
+ name = extract_ds_name_from_gcs_path(path)
Review Comment:
nit: maybe make those methods return `Dataset`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]