kacpermuda commented on code in PR #45801:
URL: https://github.com/apache/airflow/pull/45801#discussion_r1924115170


##########
providers/src/airflow/providers/google/cloud/operators/bigquery_dts.py:
##########
@@ -412,4 +414,117 @@ def execute_completed(self, context: Context, event: 
dict):
             event["message"],
         )
 
-        return TransferRun.to_dict(transfer_run)
+        # Save as attribute for further use by OpenLineage
+        self._transfer_run = TransferRun.to_dict(transfer_run)
+        return self._transfer_run
+
+    def get_openlineage_facets_on_complete(self, _):
+        """Implement _on_complete as we need a run config to extract 
information."""
+        from urllib.parse import urlsplit
+
+        from airflow.providers.common.compat.openlineage.facet import Dataset, 
ErrorMessageRunFacet
+        from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+        from airflow.providers.google.cloud.openlineage.utils import (
+            BIGQUERY_NAMESPACE,
+            extract_ds_name_from_gcs_path,
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo, 
SQLParser
+
+        if not self._transfer_run:
+            self.log.debug("No BigQuery Data Transfer configuration was found 
by OpenLineage.")
+            return OperatorLineage()
+
+        data_source_id = self._transfer_run["data_source_id"]
+        dest_dataset_id = self._transfer_run["destination_dataset_id"]
+        params = self._transfer_run["params"]
+
+        input_datasets, output_datasets = [], []
+        run_facets, job_facets = {}, {}
+        if data_source_id in ("google_cloud_storage", "amazon_s3", 
"azure_blob_storage"):
+            if data_source_id == "google_cloud_storage":
+                bucket, path = _parse_gcs_url(params["data_path_template"])  # 
gs://bucket...
+                namespace = f"gs://{bucket}"
+                name = extract_ds_name_from_gcs_path(path)

Review Comment:
   Just to be sure what you mean: you think it'd better if each `if` statement 
be turned into separate method? Or you simply want each `if` statement to 
append to input datasets separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to