Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-03-26 Thread via GitHub


github-actions[bot] closed pull request #45257: [OpenLineage] Added Openlineage 
support for DatabricksCopyIntoOperator
URL: https://github.com/apache/airflow/pull/45257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-03-21 Thread via GitHub


github-actions[bot] commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2744712552

   This pull request has been automatically marked as stale because it has not 
had recent activity. It will be closed in 5 days if no further activity occurs. 
Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-31 Thread via GitHub


kacpermuda commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1936905117


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -273,7 +273,12 @@ def __init__(
 if force_copy is not None:
 self._copy_options["force"] = "true" if force_copy else "false"
 
+# These will be used by OpenLineage
+self._sql: str | None = None
+self._result: list[Any] = []

Review Comment:
   I think this is no longer needed?



##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,166 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+hook.run(self._sql)
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def _parse_input_dataset(self) -> tuple[list[Any], list[Any]]:
+"""Parse file_location to build the input dataset."""
+from airflow.providers.common.compat.openlineage.facet import Dataset, 
Error
+
+input_datasets: list[Dataset] = []
+extraction_errors: list[Error] = []
+
+if not self.file_location:
+return input_datasets, extraction_errors
+
+try:
+from urllib.parse import urlparse
+
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", "azure", 
"abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: {parsed_uri.scheme}")
+
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+return input_datasets, extraction_errors
+
+def _create_sql_job_facet(self) -> tuple[dict, list[Any]]:
+"""Create SQL job facet from the SQL query."""
+from airflow.providers.common.compat.openlineage.facet import Error, 
SQLJobFacet
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+job_facets = {}
+extraction_errors: list[Error] = []
+
+try:
+import re
+
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))
+job_facets["sql"] = SQLJobFacet(query=normalized_sql)
+except Exception as e:
+self.log.error("Failed creating SQL job facet: %s", str(e))

Review Comment:
   I think we usually try not to log on error level unless absolutely 
necessary. Could you review the code and adjust in other places as well? Maybe 
warning is enough? WDYT?



##
providers/tests/databricks/operators/test_databricks_copy.py:
##
@@ -253,3 +258,150 @@ def test_templating(create_task_instance_of_operator, 
session):
 assert task.files == "files"
 assert task.table_name == "table-name"
 assert task.databricks_conn_id == "databricks-conn-id"
+
+
[email protected]("airflow.providers.databricks.operators.databricks_sql.DatabricksSqlHook")
+def test_get_openlineage_facets_on_complete_s3(mock_hook):
+"""Test OpenLineage facets generation for S3 source."""
+mock_hook().run.return_value = [
+{"file": "s3://bucket/dir1/file1.csv"},
+{"file": "s3://bucket/dir1/file2.csv"},
+]
+mock_hook().get_connection().host = "databricks.com"
+
+op = DatabricksCopyIntoOperator(
+task_id="test",
+table_name="schema.table",
+file_location="s3://bucket/dir1",
+file_format="CSV",
+)
+op._sql = "COPY INTO schema.table FROM 's3://bucket/dir1'"
+op._result = mock_hook().run.return_value
+
+lineage = op.get_openlineage_facets_on_complete(None)
+
+assert lineage == OperatorLineage(
+inputs=[Dataset(namespace="s3://bucket", name="dir1")],
+outputs=[Dataset(namespace="databricks://databricks.com", 
name="schema.table")],
+job_facets={"sql": SQLJobFacet(query="COPY INTO schema.table FROM 
's3://bucket/dir1'")},
+run_facets={},
+)
+
+

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-25 Thread via GitHub


rahul-madaan commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2614077090

   @kacpermuda I have addressed all the comments, please take a look. I have 
tested it on s3 and it is working perfectly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929280068


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))
+job_facets["sql"] = SQLJobFacet(query=normalized_sql)
+except Exception as e:
+self.log.error("Failed creating SQL job facet: %s", str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task="sql_facet_creation", taskNumber=None)
+)
+
+# Add extraction error facet if there are any errors
+if extraction_errors:
+run_facets["extractionError"] = ExtractionErrorRunFacet(
+totalTasks=1,
+failedTasks=len(extraction_errors),
+errors=extraction_errors,
+)
+# Return only error facets for invalid URIs
+return OperatorLineage(
+inputs=[],
+outputs=[],
+job_facets=job_facets,
+run_facets=run_facets,
+)
+
+# Only proceed with output dataset if input was valid
+output_dataset = None
+if self.table_name:
+try:
+table_parts = self.table_name.split(".")
+if len(table_parts) == 3:  # catalog.schema.table
+catalog, schema, table = table_parts
+elif len(table_parts) == 2:  # schema.table
+catalog = None
+schema, table = table_parts
+else:
+catalog = None
+schema = None
+table = self.table_name
+
+hook = self._get_hook()
+conn = hook.get_connection(hook.databricks_conn_id)
+out

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929276031


##
dev/breeze/tests/test_selective_checks.py:
##
@@ -1762,13 +1762,13 @@ def test_expected_output_push(
 "skip-providers-tests": "false",
 "test-groups": "['core', 'providers']",
 "docs-build": "true",
-"docs-list-as-string": "apache-airflow amazon common.compat 
common.io common.sql dbt.cloud ftp google mysql openlineage postgres sftp 
snowflake trino",
+"docs-list-as-string": "apache-airflow amazon common.compat 
common.io common.sql databricks dbt.cloud ftp google mysql openlineage postgres 
sftp snowflake trino",
 "skip-pre-commits": 
"check-provider-yaml-valid,flynt,identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,"
 "ts-compile-format-lint-ui,ts-compile-format-lint-www",
 "run-kubernetes-tests": "false",
 "upgrade-to-newer-dependencies": "false",
 "core-test-types-list-as-string": "API Always CLI Core 
Operators Other Serialization WWW",
-"providers-test-types-list-as-string": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino]
 Providers[google]",
+"providers-test-types-list-as-string": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,databricks,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino]
 Providers[google]",

Review Comment:
   removed these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929282204


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))
+job_facets["sql"] = SQLJobFacet(query=normalized_sql)
+except Exception as e:
+self.log.error("Failed creating SQL job facet: %s", str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task="sql_facet_creation", taskNumber=None)
+)
+
+# Add extraction error facet if there are any errors
+if extraction_errors:
+run_facets["extractionError"] = ExtractionErrorRunFacet(
+totalTasks=1,
+failedTasks=len(extraction_errors),
+errors=extraction_errors,
+)
+# Return only error facets for invalid URIs
+return OperatorLineage(
+inputs=[],
+outputs=[],
+job_facets=job_facets,
+run_facets=run_facets,
+)
+
+# Only proceed with output dataset if input was valid
+output_dataset = None
+if self.table_name:
+try:
+table_parts = self.table_name.split(".")
+if len(table_parts) == 3:  # catalog.schema.table
+catalog, schema, table = table_parts
+elif len(table_parts) == 2:  # schema.table
+catalog = None
+schema, table = table_parts
+else:
+catalog = None
+schema = None
+table = self.table_name
+
+hook = self._get_hook()
+conn = hook.get_connection(hook.databricks_conn_id)
+out

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929282412


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))
+job_facets["sql"] = SQLJobFacet(query=normalized_sql)
+except Exception as e:
+self.log.error("Failed creating SQL job facet: %s", str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task="sql_facet_creation", taskNumber=None)
+)
+
+# Add extraction error facet if there are any errors
+if extraction_errors:
+run_facets["extractionError"] = ExtractionErrorRunFacet(
+totalTasks=1,
+failedTasks=len(extraction_errors),
+errors=extraction_errors,
+)
+# Return only error facets for invalid URIs
+return OperatorLineage(
+inputs=[],
+outputs=[],
+job_facets=job_facets,
+run_facets=run_facets,
+)
+
+# Only proceed with output dataset if input was valid
+output_dataset = None
+if self.table_name:
+try:
+table_parts = self.table_name.split(".")
+if len(table_parts) == 3:  # catalog.schema.table
+catalog, schema, table = table_parts
+elif len(table_parts) == 2:  # schema.table
+catalog = None
+schema, table = table_parts
+else:
+catalog = None
+schema = None
+table = self.table_name
+
+hook = self._get_hook()
+conn = hook.get_connection(hook.databricks_conn_id)
+out

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929281714


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))
+job_facets["sql"] = SQLJobFacet(query=normalized_sql)
+except Exception as e:
+self.log.error("Failed creating SQL job facet: %s", str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task="sql_facet_creation", taskNumber=None)
+)
+
+# Add extraction error facet if there are any errors
+if extraction_errors:
+run_facets["extractionError"] = ExtractionErrorRunFacet(
+totalTasks=1,
+failedTasks=len(extraction_errors),
+errors=extraction_errors,
+)
+# Return only error facets for invalid URIs
+return OperatorLineage(
+inputs=[],
+outputs=[],
+job_facets=job_facets,
+run_facets=run_facets,
+)
+
+# Only proceed with output dataset if input was valid
+output_dataset = None
+if self.table_name:
+try:
+table_parts = self.table_name.split(".")
+if len(table_parts) == 3:  # catalog.schema.table
+catalog, schema, table = table_parts
+elif len(table_parts) == 2:  # schema.table
+catalog = None
+schema, table = table_parts
+else:
+catalog = None
+schema = None
+table = self.table_name
+
+hook = self._get_hook()
+conn = hook.get_connection(hook.databricks_conn_id)
+out

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929280681


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))
+job_facets["sql"] = SQLJobFacet(query=normalized_sql)
+except Exception as e:
+self.log.error("Failed creating SQL job facet: %s", str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task="sql_facet_creation", taskNumber=None)
+)
+
+# Add extraction error facet if there are any errors
+if extraction_errors:
+run_facets["extractionError"] = ExtractionErrorRunFacet(
+totalTasks=1,
+failedTasks=len(extraction_errors),
+errors=extraction_errors,
+)
+# Return only error facets for invalid URIs
+return OperatorLineage(
+inputs=[],
+outputs=[],
+job_facets=job_facets,
+run_facets=run_facets,
+)
+
+# Only proceed with output dataset if input was valid
+output_dataset = None
+if self.table_name:
+try:
+table_parts = self.table_name.split(".")
+if len(table_parts) == 3:  # catalog.schema.table
+catalog, schema, table = table_parts
+elif len(table_parts) == 2:  # schema.table
+catalog = None
+schema, table = table_parts
+else:
+catalog = None
+schema = None
+table = self.table_name
+
+hook = self._get_hook()
+conn = hook.get_connection(hook.databricks_conn_id)
+out

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929278630


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))

Review Comment:
   This is done in CopyFromExternalStageToSnowflakeOperator OL implementation 
here 
https://github.com/apache/airflow/blob/c600a95aaf2df80ca59889b22f741bc8289138e1/providers/src/airflow/providers/snowflake/transfers/copy_into_snowflake.py#L287
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929276663


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []

Review Comment:
   I updated code to not save the result now. It is not required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-24 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1929275812


##
generated/provider_dependencies.json:
##


Review Comment:
   removed the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-07 Thread via GitHub


kacpermuda commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1905679785


##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []
 
 def on_kill(self) -> None:
 # NB: on_kill isn't required for this operator since query cancelling 
gets
 # handled in `DatabricksSqlHook.run()` method which is called in 
`execute()`
 ...
+
+def get_openlineage_facets_on_complete(self, task_instance):
+"""
+Compute OpenLineage facets for the COPY INTO command.
+
+Attempts to parse input files (from S3, GCS, Azure Blob, etc.) and 
build an
+input dataset list and an output dataset (the Delta table).
+"""
+import re
+from urllib.parse import urlparse
+
+from airflow.providers.common.compat.openlineage.facet import (
+Dataset,
+Error,
+ExternalQueryRunFacet,
+ExtractionErrorRunFacet,
+SQLJobFacet,
+)
+from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.providers.openlineage.sqlparser import SQLParser
+
+if not self._sql:
+self.log.warning("No SQL query found, returning empty 
OperatorLineage.")
+return OperatorLineage()
+
+input_datasets = []
+extraction_errors = []
+job_facets = {}
+run_facets = {}
+
+# Parse file_location to build the input dataset (if possible).
+if self.file_location:
+try:
+parsed_uri = urlparse(self.file_location)
+# Only process known schemes
+if parsed_uri.scheme not in ("s3", "s3a", "s3n", "gs", 
"azure", "abfss", "wasbs"):
+raise ValueError(f"Unsupported scheme: 
{parsed_uri.scheme}")
+
+# Keep original scheme for s3/s3a/s3n
+scheme = parsed_uri.scheme
+namespace = f"{scheme}://{parsed_uri.netloc}"
+path = parsed_uri.path.lstrip("/") or "/"
+input_datasets.append(Dataset(namespace=namespace, name=path))
+except Exception as e:
+self.log.error("Failed to parse file_location: %s, error: %s", 
self.file_location, str(e))
+extraction_errors.append(
+Error(errorMessage=str(e), stackTrace=None, 
task=self.file_location, taskNumber=None)
+)
+
+# Build SQLJobFacet
+try:
+normalized_sql = SQLParser.normalize_sql(self._sql)
+normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", 
normalized_sql))

Review Comment:
   I think we usually only use `SQLParser.normalize_sql` for the SQLJobFacet. 
What is the reason for this additional replacements? Could you add some 
comments if it's necessary ?



##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.info("Executing: %s", sql)
+"""Execute the COPY INTO command and store the result for lineage 
reporting."""
+self._sql = self._create_sql_query()
+self.log.info("Executing SQL: %s", self._sql)
+
 hook = self._get_hook()
-hook.run(sql)
+result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
+# Convert to list, handling the case where result might be None
+self._result = list(result) if result is not None else []

Review Comment:
   What is the result saved here? Later in the code It appears to be query_ids, 
but are we sure that is what we are getting? What if somebody submits a query 
that reads a million rows? I'm asking because it looks like a place with a lot 
of potential to add a lot of processing even for users that do not use 
OpenLineage integration.



##
providers/src/airflow/providers/databricks/operators/databricks_sql.py:
##
@@ -349,12 +360,233 @@ def _create_sql_query(self) -> str:
 return sql.strip()
 
 def execute(self, context: Context) -> Any:
-sql = self._create_sql_query()
-self.log.in

Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-07 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1905070657


##
dev/breeze/tests/test_selective_checks.py:
##
@@ -1762,13 +1762,13 @@ def test_expected_output_push(
 "skip-providers-tests": "false",
 "test-groups": "['core', 'providers']",
 "docs-build": "true",
-"docs-list-as-string": "apache-airflow amazon common.compat 
common.io common.sql dbt.cloud ftp google mysql openlineage postgres sftp 
snowflake trino",
+"docs-list-as-string": "apache-airflow amazon common.compat 
common.io common.sql databricks dbt.cloud ftp google mysql openlineage postgres 
sftp snowflake trino",
 "skip-pre-commits": 
"check-provider-yaml-valid,flynt,identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,"
 "ts-compile-format-lint-ui,ts-compile-format-lint-www",
 "run-kubernetes-tests": "false",
 "upgrade-to-newer-dependencies": "false",
 "core-test-types-list-as-string": "API Always CLI Core 
Operators Other Serialization WWW",
-"providers-test-types-list-as-string": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino]
 Providers[google]",
+"providers-test-types-list-as-string": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,databricks,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino]
 Providers[google]",

Review Comment:
   Again, this was done because assertion in one of the tests was failing. I 
believe this is not just for AWS provider. the ID of test says "Trigger 
openlineage and related providers tests when Assets files changed"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-07 Thread via GitHub


rahul-madaan commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1905066839


##
generated/provider_dependencies.json:
##


Review Comment:
   I was getting an error in one of the tests. I ran the recommended command 
and it got automatically updated. Once this was updated, test started passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-06 Thread via GitHub


jscheffl commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1904701780


##
dev/breeze/tests/test_selective_checks.py:
##
@@ -1762,13 +1762,13 @@ def test_expected_output_push(
 "skip-providers-tests": "false",
 "test-groups": "['core', 'providers']",
 "docs-build": "true",
-"docs-list-as-string": "apache-airflow amazon common.compat 
common.io common.sql dbt.cloud ftp google mysql openlineage postgres sftp 
snowflake trino",
+"docs-list-as-string": "apache-airflow amazon common.compat 
common.io common.sql databricks dbt.cloud ftp google mysql openlineage postgres 
sftp snowflake trino",
 "skip-pre-commits": 
"check-provider-yaml-valid,flynt,identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,"
 "ts-compile-format-lint-ui,ts-compile-format-lint-www",
 "run-kubernetes-tests": "false",
 "upgrade-to-newer-dependencies": "false",
 "core-test-types-list-as-string": "API Always CLI Core 
Operators Other Serialization WWW",
-"providers-test-types-list-as-string": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino]
 Providers[google]",
+"providers-test-types-list-as-string": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,databricks,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino]
 Providers[google]",

Review Comment:
   Why do you add a dependency for databricks to AWS?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-06 Thread via GitHub


jscheffl commented on code in PR #45257:
URL: https://github.com/apache/airflow/pull/45257#discussion_r1904701418


##
generated/provider_dependencies.json:
##


Review Comment:
   This is generated code. You make changes manually here? The source should be 
the provider.yaml in the databricks provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-06 Thread via GitHub


jscheffl commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2574036726

   > errors are not getting resolved even after rebasing. It somehow started 
coming after Jarek rebased it. Should I reset the branch and cherrypick my 
commits to resolve this?
   
   Usually not needed - but I agree it seems the errors are unrelated to your 
changes... on first view.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-06 Thread via GitHub


rahul-madaan commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2573936622

   errors are not getting resolved even after rebasing. It somehow started 
coming after Jarek rebased it. Should I reset the branch and  cherrypick my 
commits to resolve this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-06 Thread via GitHub


kacpermuda commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2572897340

   @rahul-madaan Can you please rebase and make sure all the CI is green? I'll 
try to review the PR this week :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-06 Thread via GitHub


rahul-madaan commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2572817109

   @kacpermuda @potiuk A gentle reminder, please review the PR whenever you 
find some time this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2025-01-02 Thread via GitHub


potiuk commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2567701976

   @rahul-madaan  -> I rebased it. we found and issue with @jscheffl with the 
new caching scheme - fixed in https://github.com/apache/airflow/pull/45347 that 
would run "main" version of the tests. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator [airflow]

2024-12-28 Thread via GitHub


rahul-madaan commented on PR #45257:
URL: https://github.com/apache/airflow/pull/45257#issuecomment-2564271610

   @kacpermuda @potiuk could you please take a look at the PR and approve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]