This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new beea114b0f1 added parent info injection to EMR operator (#66816)
beea114b0f1 is described below
commit beea114b0f13a0301dc54a361b7a94dc2f702f1f
Author: Rahul Madan <[email protected]>
AuthorDate: Fri May 15 05:47:31 2026 +0530
added parent info injection to EMR operator (#66816)
Signed-off-by: Rahul Madan <[email protected]>
---
.../airflow/providers/amazon/aws/operators/emr.py | 30 ++-
.../amazon/aws/operators/test_emr_containers.py | 228 +++++++++++++++++++++
2 files changed, 257 insertions(+), 1 deletion(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
index daeed5f8ced..14d91d61350 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
@@ -492,6 +492,14 @@ class
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
:param deferrable: Run operator in the deferrable mode.
:param cancel_on_kill: Flag to indicate whether to cancel the job
when the task is killed while in deferrable mode.
+ :param openlineage_inject_parent_job_info: If True, injects OpenLineage
parent job information
+ into the EMR on EKS ``spark-defaults`` configuration so the Spark job
emits a
+ ``parentRunFacet`` linking back to the Airflow task. Defaults to the
+ ``openlineage.spark_inject_parent_job_info`` config value.
+ :param openlineage_inject_transport_info: If True, injects OpenLineage
transport configuration
+ into the EMR on EKS ``spark-defaults`` configuration so the Spark job
sends OL events
+ to the same backend as Airflow. Defaults to the
+ ``openlineage.spark_inject_transport_info`` config value.
"""
aws_hook_class = EmrContainerHook
@@ -522,6 +530,12 @@ class
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
job_retry_max_attempts: int | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
cancel_on_kill: bool = True,
+ openlineage_inject_parent_job_info: bool = conf.getboolean(
+ "openlineage", "spark_inject_parent_job_info", fallback=False
+ ),
+ openlineage_inject_transport_info: bool = conf.getboolean(
+ "openlineage", "spark_inject_transport_info", fallback=False
+ ),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
@@ -540,6 +554,8 @@ class
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
self.job_id: str | None = None
self.deferrable = deferrable
self.cancel_on_kill = cancel_on_kill
+ self.openlineage_inject_parent_job_info =
openlineage_inject_parent_job_info
+ self.openlineage_inject_transport_info =
openlineage_inject_transport_info
@property
def _hook_parameters(self):
@@ -547,12 +563,24 @@ class
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
def execute(self, context: Context) -> str | None:
"""Run job on EMR Containers."""
+ configuration_overrides = self.configuration_overrides
+ if self.openlineage_inject_parent_job_info:
+ self.log.info("Injecting OpenLineage parent job information into
EMR on EKS configuration.")
+ configuration_overrides =
inject_parent_job_information_into_emr_serverless_properties(
+ configuration_overrides, context
+ )
+ if self.openlineage_inject_transport_info:
+ self.log.info("Injecting OpenLineage transport information into
EMR on EKS configuration.")
+ configuration_overrides =
inject_transport_information_into_emr_serverless_properties(
+ configuration_overrides, context
+ )
+
self.job_id = self.hook.submit_job(
self.name,
self.execution_role_arn,
self.release_label,
self.job_driver,
- self.configuration_overrides,
+ configuration_overrides,
self.client_request_token,
self.tags,
self.job_retry_max_attempts,
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
index ac4e9ba1e4f..d18a0f599cb 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
@@ -209,3 +209,231 @@ class TestEmrEksCreateClusterOperator:
def test_template_fields(self):
validate_template_fields(self.emr_container)
+
+
+class TestEmrContainerOperatorOpenLineageInjection:
+ """Tests for OpenLineage parent job info and transport info injection in
EmrContainerOperator."""
+
+ @mock.patch.object(EmrContainerHook, "submit_job")
+ @mock.patch.object(EmrContainerHook, "check_query_status",
return_value="COMPLETED")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ def test_inject_parent_job_info_called_when_enabled(
+ self, mock_inject_parent, mock_check_status, mock_submit_job
+ ):
+ mock_inject_parent.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.parentJobNamespace":
"ns"},
+ }
+ ]
+ }
+ mock_submit_job.return_value = "job123"
+
+ operator = EmrContainerOperator(
+ task_id="start_job",
+ name="test_emr_job",
+ virtual_cluster_id="vzw123456",
+ execution_role_arn="arn:aws:somerole",
+ release_label="6.3.0-latest",
+ job_driver={},
+ configuration_overrides={},
+ poll_interval=0,
+ client_request_token=GENERATED_UUID,
+ openlineage_inject_parent_job_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_parent.assert_called_once()
+ call_args = mock_submit_job.call_args.args
+ config_overrides = call_args[4]
+ assert (
+ config_overrides["applicationConfiguration"][0]["properties"][
+ "spark.openlineage.parentJobNamespace"
+ ]
+ == "ns"
+ )
+
+ @mock.patch.object(EmrContainerHook, "submit_job")
+ @mock.patch.object(EmrContainerHook, "check_query_status",
return_value="COMPLETED")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ def test_inject_parent_job_info_not_called_when_disabled(
+ self, mock_inject_parent, mock_check_status, mock_submit_job
+ ):
+ mock_submit_job.return_value = "job123"
+
+ operator = EmrContainerOperator(
+ task_id="start_job",
+ name="test_emr_job",
+ virtual_cluster_id="vzw123456",
+ execution_role_arn="arn:aws:somerole",
+ release_label="6.3.0-latest",
+ job_driver={},
+ configuration_overrides={},
+ poll_interval=0,
+ client_request_token=GENERATED_UUID,
+ openlineage_inject_parent_job_info=False,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_parent.assert_not_called()
+
+ @mock.patch.object(EmrContainerHook, "submit_job")
+ @mock.patch.object(EmrContainerHook, "check_query_status",
return_value="COMPLETED")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_transport_information_into_emr_serverless_properties"
+ )
+ def test_inject_transport_info_called_when_enabled(
+ self, mock_inject_transport, mock_check_status, mock_submit_job
+ ):
+ mock_inject_transport.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.transport.type": "http"},
+ }
+ ]
+ }
+ mock_submit_job.return_value = "job123"
+
+ operator = EmrContainerOperator(
+ task_id="start_job",
+ name="test_emr_job",
+ virtual_cluster_id="vzw123456",
+ execution_role_arn="arn:aws:somerole",
+ release_label="6.3.0-latest",
+ job_driver={},
+ configuration_overrides={},
+ poll_interval=0,
+ client_request_token=GENERATED_UUID,
+ openlineage_inject_transport_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_transport.assert_called_once()
+ call_args = mock_submit_job.call_args.args
+ config_overrides = call_args[4]
+ assert (
+
config_overrides["applicationConfiguration"][0]["properties"]["spark.openlineage.transport.type"]
+ == "http"
+ )
+
+ @mock.patch.object(EmrContainerHook, "submit_job")
+ @mock.patch.object(EmrContainerHook, "check_query_status",
return_value="COMPLETED")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_transport_information_into_emr_serverless_properties"
+ )
+ def test_inject_both_parent_and_transport_info(
+ self, mock_inject_transport, mock_inject_parent, mock_check_status,
mock_submit_job
+ ):
+ mock_inject_parent.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.parentJobNamespace":
"ns"},
+ }
+ ]
+ }
+ mock_inject_transport.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {
+ **overrides.get("applicationConfiguration",
[{}])[0].get("properties", {}),
+ "spark.openlineage.transport.type": "http",
+ },
+ }
+ ]
+ }
+ mock_submit_job.return_value = "job123"
+
+ operator = EmrContainerOperator(
+ task_id="start_job",
+ name="test_emr_job",
+ virtual_cluster_id="vzw123456",
+ execution_role_arn="arn:aws:somerole",
+ release_label="6.3.0-latest",
+ job_driver={},
+ configuration_overrides={},
+ poll_interval=0,
+ client_request_token=GENERATED_UUID,
+ openlineage_inject_parent_job_info=True,
+ openlineage_inject_transport_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_parent.assert_called_once()
+ mock_inject_transport.assert_called_once()
+
+ @mock.patch.object(EmrContainerHook, "submit_job")
+ @mock.patch.object(EmrContainerHook, "check_query_status",
return_value="COMPLETED")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ def test_inject_parent_job_info_preserves_existing_config(
+ self, mock_inject_parent, mock_check_status, mock_submit_job
+ ):
+ """Existing configuration_overrides (e.g. monitoringConfiguration) are
preserved."""
+ existing_config = {
+ "monitoringConfiguration": {
+ "cloudWatchMonitoringConfiguration": {
+ "logGroupName": "/aws/emr-eks/jobs",
+ "logStreamNamePrefix": "test",
+ }
+ },
+ "applicationConfiguration": [
+ {"classification": "spark-defaults", "properties":
{"spark.driver.memory": "8G"}}
+ ],
+ }
+ mock_inject_parent.side_effect = lambda overrides, ctx: {
+ **overrides,
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {
+
**overrides["applicationConfiguration"][0]["properties"],
+ "spark.openlineage.parentJobNamespace": "ns",
+ },
+ }
+ ],
+ }
+ mock_submit_job.return_value = "job123"
+
+ operator = EmrContainerOperator(
+ task_id="start_job",
+ name="test_emr_job",
+ virtual_cluster_id="vzw123456",
+ execution_role_arn="arn:aws:somerole",
+ release_label="6.3.0-latest",
+ job_driver={},
+ configuration_overrides=existing_config,
+ poll_interval=0,
+ client_request_token=GENERATED_UUID,
+ openlineage_inject_parent_job_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ call_args = mock_submit_job.call_args.args
+ config_overrides = call_args[4]
+ # Monitoring config preserved
+ assert (
+
config_overrides["monitoringConfiguration"]["cloudWatchMonitoringConfiguration"]["logGroupName"]
+ == "/aws/emr-eks/jobs"
+ )
+ # OL parent info injected
+ props = config_overrides["applicationConfiguration"][0]["properties"]
+ assert props["spark.openlineage.parentJobNamespace"] == "ns"
+ assert props["spark.driver.memory"] == "8G"