mobuchowski commented on code in PR #64513:
URL: https://github.com/apache/airflow/pull/64513#discussion_r3027512101
##########
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py:
##########
@@ -195,3 +195,70 @@ def
inject_transport_information_into_spark_properties(properties: dict, context
return properties
return {**properties, **_get_transport_information_as_spark_properties()}
+
+
+def inject_parent_job_information_into_glue_arguments(script_args: dict,
context: Context) -> dict:
+ """
+ Inject parent job information into Glue job arguments if not already
present.
+
+ Glue jobs pass Spark properties via the ``--conf`` key in the script_args
dict.
+ Multiple Spark conf properties are combined into the ``--conf`` key value
with
+ ``' --conf '`` as separator between each property assignment.
+
+ Args:
+ script_args: Glue job script arguments dict (maps to boto3
``Arguments``).
+ context: The context containing task instance information.
+
+ Returns:
+ Modified script_args dict with OpenLineage parent job information
injected, if applicable.
+ """
+ existing_conf = script_args.get("--conf", "")
+
+ if "spark.openlineage.parent" in existing_conf:
+ log.info(
+ "Some OpenLineage properties with parent job information are
already present "
+ "in Glue job arguments. Skipping the injection of OpenLineage "
+ "parent job information into Glue job arguments."
+ )
+ return script_args
+
+ parent_props = _get_parent_job_information_as_spark_properties(context)
+ new_conf_parts = " --conf ".join(f"{k}={v}" for k, v in
parent_props.items())
Review Comment:
Also, we can try/except the whole logic - if something fails during the
injection, we should fall back to not doing anything rather than breaking the
job.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]