kacpermuda commented on code in PR #45801: URL: https://github.com/apache/airflow/pull/45801#discussion_r1926814481
########## providers/src/airflow/providers/google/cloud/operators/bigquery_dts.py: ########## @@ -412,4 +414,117 @@ def execute_completed(self, context: Context, event: dict): event["message"], ) - return TransferRun.to_dict(transfer_run) + # Save as attribute for further use by OpenLineage + self._transfer_run = TransferRun.to_dict(transfer_run) + return self._transfer_run + + def get_openlineage_facets_on_complete(self, _): + """Implement _on_complete as we need a run config to extract information.""" + from urllib.parse import urlsplit + + from airflow.providers.common.compat.openlineage.facet import Dataset, ErrorMessageRunFacet + from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url + from airflow.providers.google.cloud.openlineage.utils import ( + BIGQUERY_NAMESPACE, + extract_ds_name_from_gcs_path, + ) + from airflow.providers.openlineage.extractors import OperatorLineage + from airflow.providers.openlineage.sqlparser import DatabaseInfo, SQLParser + + if not self._transfer_run: + self.log.debug("No BigQuery Data Transfer configuration was found by OpenLineage.") + return OperatorLineage() + + data_source_id = self._transfer_run["data_source_id"] + dest_dataset_id = self._transfer_run["destination_dataset_id"] + params = self._transfer_run["params"] + + input_datasets, output_datasets = [], [] + run_facets, job_facets = {}, {} + if data_source_id in ("google_cloud_storage", "amazon_s3", "azure_blob_storage"): + if data_source_id == "google_cloud_storage": + bucket, path = _parse_gcs_url(params["data_path_template"]) # gs://bucket... + namespace = f"gs://{bucket}" + name = extract_ds_name_from_gcs_path(path) Review Comment: Not sure if it's the right approach here. In different operators I'm using this function and sometimes some post processing is needed on just name, and sometimes it's not used for gs uri. I think you are right that some refactoring here is needed, maybe renaming it to be more generic and possibly adding another function that will indeed return a dataset and use this more generic function underneath. I'd keep that for another PR if that's okay with you, as it will require changes to many different operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org