kacpermuda commented on code in PR #67894:
URL: https://github.com/apache/airflow/pull/67894#discussion_r3350554693
##########
providers/databricks/src/airflow/providers/databricks/utils/openlineage.py:
##########
@@ -320,3 +326,128 @@ def emit_openlineage_events_for_databricks_queries(
log.info("OpenLineage has successfully finished processing information
about Databricks queries.")
return
+
+
+def _is_openlineage_provider_accessible() -> bool:
+ """
+ Check if the OpenLineage provider is accessible.
+
+ This function attempts to import the necessary OpenLineage modules and
checks if the provider
+ is enabled and the listener is available.
+
+ Returns:
+ True if the OpenLineage provider is accessible, False otherwise.
+ """
+ try:
+ from airflow.providers.openlineage.conf import is_disabled
+ from airflow.providers.openlineage.plugins.listener import
get_openlineage_listener
+ except ImportError:
+ log.debug("OpenLineage provider could not be imported.")
+ return False
+
+ if is_disabled():
+ log.debug("OpenLineage provider is disabled.")
+ return False
+
+ if not get_openlineage_listener():
+ log.debug("OpenLineage listener could not be found.")
+ return False
+
+ return True
+
+
+def _extract_new_clusters_from_databricks_job(job: dict) -> list[dict]:
+ """
+ Collect every ``new_cluster`` definition that can carry Spark properties
in a Databricks job.
+
+ A ``runs/submit`` payload can define a ``new_cluster`` in three places: at
the top level
+ (single-task form), inline on each task (``tasks[].new_cluster``), or as a
shared job cluster
+ (``job_clusters[].new_cluster``) referenced by tasks through
``job_cluster_key``. Tasks running on
+ an ``existing_cluster_id`` have no ``new_cluster`` to mutate and are
skipped.
+
+ Args:
+ job: The Databricks ``runs/submit`` job definition.
+
+ Returns:
+ The list of ``new_cluster`` dicts found in the job definition.
+ """
+ new_clusters = []
+ if isinstance(job.get("new_cluster"), dict):
+ new_clusters.append(job["new_cluster"])
+ for key in ("tasks", "job_clusters"):
+ if isinstance(job.get(key), list):
+ new_clusters.extend(
+ item["new_cluster"] for item in job[key] if
isinstance(item.get("new_cluster"), dict)
+ )
+ return new_clusters
+
+
+def inject_openlineage_properties_into_databricks_job(
+ job: dict, context: Context, inject_parent_job_info: bool,
inject_transport_info: bool
+) -> dict:
+ """
+ Inject OpenLineage properties into a Databricks job definition.
+
+ This function does not remove existing configurations or modify the job
definition in any way,
+ except to add the required OpenLineage properties if they are not already
present.
+
+ The entire properties injection process will be skipped if any condition
is met:
+ - The OpenLineage provider is not accessible.
+ - The job has no ``new_cluster`` definition to inject Spark properties
into (e.g. it only uses
+ an ``existing_cluster_id``, whose Spark configuration is fixed at
cluster creation time).
+ - Both `inject_parent_job_info` and `inject_transport_info` are set to
False.
+
+ Additionally, specific information will not be injected if relevant
OpenLineage properties already
+ exist.
+
+ Parent job information will not be injected if:
+ - Any property prefixed with `spark.openlineage.parent` exists.
+ - `inject_parent_job_info` is False.
+ Transport information will not be injected if:
+ - Any property prefixed with `spark.openlineage.transport` exists.
+ - `inject_transport_info` is False.
+
+ Args:
+ job: The original Databricks ``runs/submit`` job definition.
+ context: The Airflow context in which the job is running.
+ inject_parent_job_info: Flag indicating whether to inject parent job
information.
+ inject_transport_info: Flag indicating whether to inject transport
information.
+
+ Returns:
+ The modified job definition with OpenLineage properties injected, if
applicable.
+ """
+ if not inject_parent_job_info and not inject_transport_info:
+ log.debug("Automatic injection of OpenLineage information is
disabled.")
+ return job
+
+ if not _is_openlineage_provider_accessible():
+ log.warning(
+ "Could not access OpenLineage provider for automatic OpenLineage "
+ "properties injection. No action will be performed."
+ )
+ return job
+
+ job = copy.deepcopy(job)
+ new_clusters = _extract_new_clusters_from_databricks_job(job)
+ if not new_clusters:
+ log.warning(
Review Comment:
Thanks !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]