kacpermuda commented on code in PR #67894:
URL: https://github.com/apache/airflow/pull/67894#discussion_r3349524274


##########
providers/databricks/src/airflow/providers/databricks/utils/openlineage.py:
##########
@@ -320,3 +326,128 @@ def emit_openlineage_events_for_databricks_queries(
 
     log.info("OpenLineage has successfully finished processing information 
about Databricks queries.")
     return
+
+
+def _is_openlineage_provider_accessible() -> bool:
+    """
+    Check if the OpenLineage provider is accessible.
+
+    This function attempts to import the necessary OpenLineage modules and 
checks if the provider
+    is enabled and the listener is available.
+
+    Returns:
+        True if the OpenLineage provider is accessible, False otherwise.
+    """
+    try:
+        from airflow.providers.openlineage.conf import is_disabled
+        from airflow.providers.openlineage.plugins.listener import 
get_openlineage_listener
+    except ImportError:
+        log.debug("OpenLineage provider could not be imported.")
+        return False
+
+    if is_disabled():
+        log.debug("OpenLineage provider is disabled.")
+        return False
+
+    if not get_openlineage_listener():
+        log.debug("OpenLineage listener could not be found.")
+        return False
+
+    return True
+
+
+def _extract_new_clusters_from_databricks_job(job: dict) -> list[dict]:
+    """
+    Collect every ``new_cluster`` definition that can carry Spark properties 
in a Databricks job.
+
+    A ``runs/submit`` payload can define a ``new_cluster`` in three places: at 
the top level
+    (single-task form), inline on each task (``tasks[].new_cluster``), or as a 
shared job cluster
+    (``job_clusters[].new_cluster``) referenced by tasks through 
``job_cluster_key``. Tasks running on
+    an ``existing_cluster_id`` have no ``new_cluster`` to mutate and are 
skipped.
+
+    Args:
+        job: The Databricks ``runs/submit`` job definition.
+
+    Returns:
+        The list of ``new_cluster`` dicts found in the job definition.
+    """
+    new_clusters = []
+    if isinstance(job.get("new_cluster"), dict):
+        new_clusters.append(job["new_cluster"])
+    for key in ("tasks", "job_clusters"):
+        if isinstance(job.get(key), list):
+            new_clusters.extend(
+                item["new_cluster"] for item in job[key] if 
isinstance(item.get("new_cluster"), dict)
+            )
+    return new_clusters
+
+
+def inject_openlineage_properties_into_databricks_job(
+    job: dict, context: Context, inject_parent_job_info: bool, 
inject_transport_info: bool
+) -> dict:
+    """
+    Inject OpenLineage properties into a Databricks job definition.
+
+    This function does not remove existing configurations or modify the job 
definition in any way,
+     except to add the required OpenLineage properties if they are not already 
present.
+
+    The entire properties injection process will be skipped if any condition 
is met:
+        - The OpenLineage provider is not accessible.
+        - The job has no ``new_cluster`` definition to inject Spark properties 
into (e.g. it only uses
+          an ``existing_cluster_id``, whose Spark configuration is fixed at 
cluster creation time).
+        - Both `inject_parent_job_info` and `inject_transport_info` are set to 
False.
+
+    Additionally, specific information will not be injected if relevant 
OpenLineage properties already
+    exist.
+
+    Parent job information will not be injected if:
+        - Any property prefixed with `spark.openlineage.parent` exists.
+        - `inject_parent_job_info` is False.
+    Transport information will not be injected if:
+        - Any property prefixed with `spark.openlineage.transport` exists.
+        - `inject_transport_info` is False.
+
+    Args:
+        job: The original Databricks ``runs/submit`` job definition.
+        context: The Airflow context in which the job is running.
+        inject_parent_job_info: Flag indicating whether to inject parent job 
information.
+        inject_transport_info: Flag indicating whether to inject transport 
information.
+
+    Returns:
+        The modified job definition with OpenLineage properties injected, if 
applicable.
+    """
+    if not inject_parent_job_info and not inject_transport_info:
+        log.debug("Automatic injection of OpenLineage information is 
disabled.")
+        return job
+
+    if not _is_openlineage_provider_accessible():
+        log.warning(
+            "Could not access OpenLineage provider for automatic OpenLineage "
+            "properties injection. No action will be performed."
+        )
+        return job
+
+    job = copy.deepcopy(job)
+    new_clusters = _extract_new_clusters_from_databricks_job(job)
+    if not new_clusters:
+        log.warning(

Review Comment:
   Any idea how often / when that could happen? Wondering if we should be 
displaying a warning here, when it might happen legitimately in some user path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to