rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929278630
##########
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##########
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
return sql.strip()
def execute(self, context: Context) -> Any:
- sql = self._create_sql_query()
- self.log.info("Executing: %s", sql)
+ """Execute the COPY INTO command and store the result for lineage
reporting."""
+ self._sql = self._create_sql_query()
+ self.log.info("Executing SQL: %s", self._sql)
+
hook = self._get_hook()
- hook.run(sql)
+ result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+ # Convert to list, handling the case where result might be None
+ self._result = list(result) if result is not None else []
def on_kill(self) -> None:
# NB: on_kill isn't required for this operator since query cancelling
gets
# handled in `DatabricksSqlHook.run()` method which is called in
`execute()`
...
+
+ def get_openlineage_facets_on_complete(self, task_instance):
+ """
+ Compute OpenLineage facets for the COPY INTO command.
+
+ Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and
build an
+ input dataset list and an output dataset (the Delta table).
+ """
+ import re
+ from urllib.parse import urlparse
+
+ from airflow.providers.common.compat.openlineage.facet import (
+ Dataset,
+ Error,
+ ExternalQueryRunFacet,
+ ExtractionErrorRunFacet,
+ SQLJobFacet,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+
+ if not self._sql:
+ self.log.warning("No SQL query found, returning empty
OperatorLineage.")
+ return OperatorLineage()
+
+ input_datasets = []
+ extraction_errors = []
+ job_facets = {}
+ run_facets = {}
+
+ # Parse file_location to build the input dataset (if possible).
+ if self.file_location:
+ try:
+ parsed_uri = urlparse(self.file_location)
+ # Only process known schemes
+ if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs",
"azure", "abfss", "wasbs"):
+ raise ValueError(f"Unsupported scheme:
{parsed_uri.scheme}")
+
+ # Keep original scheme for s3/s3a/s3n
+ scheme = parsed_uri.scheme
+ namespace = f"{scheme}://{parsed_uri.netloc}"
+ path = parsed_uri.path.lstrip("/") or "/"
+ input_datasets.append(Dataset(namespace=namespace, name=path))
+ except Exception as e:
+ self.log.error("Failed to parse file_location: %s, error: %s",
self.file_location, str(e))
+ extraction_errors.append(
+ Error(errorMessage=str(e), stackTrace=None,
task=self.file_location, taskNumber=None)
+ )
+
+ # Build SQLJobFacet
+ try:
+ normalized_sql = SQLParser.normalize_sql(self._sql)
+ normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ",
normalized_sql))
Review Comment:
This is done in CopyFromExternalStageToSnowflakeOperator OL implementation
here
https://github.com/apache/airflow/blob/c600a95aaf2df80ca59889b22f741bc8289138e1/providers/src/airflow/providers/snowflake/transfers/copy_into_snowflake.py#L287
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]