mobuchowski commented on code in PR #67894:
URL: https://github.com/apache/airflow/pull/67894#discussion_r3421277612
##########
providers/databricks/src/airflow/providers/databricks/utils/openlineage.py:
##########
@@ -320,3 +326,128 @@ def emit_openlineage_events_for_databricks_queries(
log.info("OpenLineage has successfully finished processing information
about Databricks queries.")
return
+
+
+def _is_openlineage_provider_accessible() -> bool:
+ """
+ Check if the OpenLineage provider is accessible.
+
+ This function attempts to import the necessary OpenLineage modules and
checks if the provider
+ is enabled and the listener is available.
+
+ Returns:
+ True if the OpenLineage provider is accessible, False otherwise.
+ """
+ try:
+ from airflow.providers.openlineage.conf import is_disabled
+ from airflow.providers.openlineage.plugins.listener import
get_openlineage_listener
+ except ImportError:
+ log.debug("OpenLineage provider could not be imported.")
+ return False
+
+ if is_disabled():
+ log.debug("OpenLineage provider is disabled.")
+ return False
+
+ if not get_openlineage_listener():
+ log.debug("OpenLineage listener could not be found.")
+ return False
+
+ return True
+
+
+def _extract_new_clusters_from_databricks_job(job: dict) -> list[dict]:
Review Comment:
_extract_new_clusters_from_databricks_job iterates over tasks[] and
job_clusters[] but never recurses into for_each_task.task.new_cluster. A
runs/submit payload using Databricks for_each_task with an inline new_cluster
will have injection silently skipped for that cluster — the Spark job runs
without the configured OL properties, with no warning to the user.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]