kacpermuda commented on code in PR #45801:
URL: https://github.com/apache/airflow/pull/45801#discussion_r1926814481


##########
providers/src/airflow/providers/google/cloud/operators/bigquery_dts.py:
##########
@@ -412,4 +414,117 @@ def execute_completed(self, context: Context, event: 
dict):
             event["message"],
         )
 
-        return TransferRun.to_dict(transfer_run)
+        # Save as attribute for further use by OpenLineage
+        self._transfer_run = TransferRun.to_dict(transfer_run)
+        return self._transfer_run
+
+    def get_openlineage_facets_on_complete(self, _):
+        """Implement _on_complete as we need a run config to extract 
information."""
+        from urllib.parse import urlsplit
+
+        from airflow.providers.common.compat.openlineage.facet import Dataset, 
ErrorMessageRunFacet
+        from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+        from airflow.providers.google.cloud.openlineage.utils import (
+            BIGQUERY_NAMESPACE,
+            extract_ds_name_from_gcs_path,
+        )
+        from airflow.providers.openlineage.extractors import OperatorLineage
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo, 
SQLParser
+
+        if not self._transfer_run:
+            self.log.debug("No BigQuery Data Transfer configuration was found 
by OpenLineage.")
+            return OperatorLineage()
+
+        data_source_id = self._transfer_run["data_source_id"]
+        dest_dataset_id = self._transfer_run["destination_dataset_id"]
+        params = self._transfer_run["params"]
+
+        input_datasets, output_datasets = [], []
+        run_facets, job_facets = {}, {}
+        if data_source_id in ("google_cloud_storage", "amazon_s3", 
"azure_blob_storage"):
+            if data_source_id == "google_cloud_storage":
+                bucket, path = _parse_gcs_url(params["data_path_template"])  # 
gs://bucket...
+                namespace = f"gs://{bucket}"
+                name = extract_ds_name_from_gcs_path(path)

Review Comment:
   Not sure if it's the right approach here. In different operators I'm using 
this function and sometimes some post processing is needed on just name, and 
sometimes it's not used for gs uri. I think you are right that some refactoring 
here is needed, maybe renaming it to be more generic and possibly adding 
another function that will indeed return a dataset and use this more generic 
function underneath. I'd keep that for another PR if that's okay with you, as 
it will require changes to many different operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to