This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new beea114b0f1 added parent info injection to EMR operator (#66816)
beea114b0f1 is described below

commit beea114b0f13a0301dc54a361b7a94dc2f702f1f
Author: Rahul Madan <[email protected]>
AuthorDate: Fri May 15 05:47:31 2026 +0530

    added parent info injection to EMR operator (#66816)
    
    Signed-off-by: Rahul Madan <[email protected]>
---
 .../airflow/providers/amazon/aws/operators/emr.py  |  30 ++-
 .../amazon/aws/operators/test_emr_containers.py    | 228 +++++++++++++++++++++
 2 files changed, 257 insertions(+), 1 deletion(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
index daeed5f8ced..14d91d61350 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
@@ -492,6 +492,14 @@ class 
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
     :param deferrable: Run operator in the deferrable mode.
     :param cancel_on_kill: Flag to indicate whether to cancel the job
         when the task is killed while in deferrable mode.
+    :param openlineage_inject_parent_job_info: If True, injects OpenLineage 
parent job information
+        into the EMR on EKS ``spark-defaults`` configuration so the Spark job 
emits a
+        ``parentRunFacet`` linking back to the Airflow task. Defaults to the
+        ``openlineage.spark_inject_parent_job_info`` config value.
+    :param openlineage_inject_transport_info: If True, injects OpenLineage 
transport configuration
+        into the EMR on EKS ``spark-defaults`` configuration so the Spark job 
sends OL events
+        to the same backend as Airflow. Defaults to the
+        ``openlineage.spark_inject_transport_info`` config value.
     """
 
     aws_hook_class = EmrContainerHook
@@ -522,6 +530,12 @@ class 
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
         job_retry_max_attempts: int | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         cancel_on_kill: bool = True,
+        openlineage_inject_parent_job_info: bool = conf.getboolean(
+            "openlineage", "spark_inject_parent_job_info", fallback=False
+        ),
+        openlineage_inject_transport_info: bool = conf.getboolean(
+            "openlineage", "spark_inject_transport_info", fallback=False
+        ),
         **kwargs: Any,
     ) -> None:
         super().__init__(**kwargs)
@@ -540,6 +554,8 @@ class 
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
         self.job_id: str | None = None
         self.deferrable = deferrable
         self.cancel_on_kill = cancel_on_kill
+        self.openlineage_inject_parent_job_info = 
openlineage_inject_parent_job_info
+        self.openlineage_inject_transport_info = 
openlineage_inject_transport_info
 
     @property
     def _hook_parameters(self):
@@ -547,12 +563,24 @@ class 
EmrContainerOperator(AwsBaseOperator[EmrContainerHook]):
 
     def execute(self, context: Context) -> str | None:
         """Run job on EMR Containers."""
+        configuration_overrides = self.configuration_overrides
+        if self.openlineage_inject_parent_job_info:
+            self.log.info("Injecting OpenLineage parent job information into 
EMR on EKS configuration.")
+            configuration_overrides = 
inject_parent_job_information_into_emr_serverless_properties(
+                configuration_overrides, context
+            )
+        if self.openlineage_inject_transport_info:
+            self.log.info("Injecting OpenLineage transport information into 
EMR on EKS configuration.")
+            configuration_overrides = 
inject_transport_information_into_emr_serverless_properties(
+                configuration_overrides, context
+            )
+
         self.job_id = self.hook.submit_job(
             self.name,
             self.execution_role_arn,
             self.release_label,
             self.job_driver,
-            self.configuration_overrides,
+            configuration_overrides,
             self.client_request_token,
             self.tags,
             self.job_retry_max_attempts,
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
index ac4e9ba1e4f..d18a0f599cb 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py
@@ -209,3 +209,231 @@ class TestEmrEksCreateClusterOperator:
 
     def test_template_fields(self):
         validate_template_fields(self.emr_container)
+
+
+class TestEmrContainerOperatorOpenLineageInjection:
+    """Tests for OpenLineage parent job info and transport info injection in 
EmrContainerOperator."""
+
+    @mock.patch.object(EmrContainerHook, "submit_job")
+    @mock.patch.object(EmrContainerHook, "check_query_status", 
return_value="COMPLETED")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    def test_inject_parent_job_info_called_when_enabled(
+        self, mock_inject_parent, mock_check_status, mock_submit_job
+    ):
+        mock_inject_parent.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {"spark.openlineage.parentJobNamespace": 
"ns"},
+                }
+            ]
+        }
+        mock_submit_job.return_value = "job123"
+
+        operator = EmrContainerOperator(
+            task_id="start_job",
+            name="test_emr_job",
+            virtual_cluster_id="vzw123456",
+            execution_role_arn="arn:aws:somerole",
+            release_label="6.3.0-latest",
+            job_driver={},
+            configuration_overrides={},
+            poll_interval=0,
+            client_request_token=GENERATED_UUID,
+            openlineage_inject_parent_job_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_parent.assert_called_once()
+        call_args = mock_submit_job.call_args.args
+        config_overrides = call_args[4]
+        assert (
+            config_overrides["applicationConfiguration"][0]["properties"][
+                "spark.openlineage.parentJobNamespace"
+            ]
+            == "ns"
+        )
+
+    @mock.patch.object(EmrContainerHook, "submit_job")
+    @mock.patch.object(EmrContainerHook, "check_query_status", 
return_value="COMPLETED")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    def test_inject_parent_job_info_not_called_when_disabled(
+        self, mock_inject_parent, mock_check_status, mock_submit_job
+    ):
+        mock_submit_job.return_value = "job123"
+
+        operator = EmrContainerOperator(
+            task_id="start_job",
+            name="test_emr_job",
+            virtual_cluster_id="vzw123456",
+            execution_role_arn="arn:aws:somerole",
+            release_label="6.3.0-latest",
+            job_driver={},
+            configuration_overrides={},
+            poll_interval=0,
+            client_request_token=GENERATED_UUID,
+            openlineage_inject_parent_job_info=False,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_parent.assert_not_called()
+
+    @mock.patch.object(EmrContainerHook, "submit_job")
+    @mock.patch.object(EmrContainerHook, "check_query_status", 
return_value="COMPLETED")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_transport_information_into_emr_serverless_properties"
+    )
+    def test_inject_transport_info_called_when_enabled(
+        self, mock_inject_transport, mock_check_status, mock_submit_job
+    ):
+        mock_inject_transport.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {"spark.openlineage.transport.type": "http"},
+                }
+            ]
+        }
+        mock_submit_job.return_value = "job123"
+
+        operator = EmrContainerOperator(
+            task_id="start_job",
+            name="test_emr_job",
+            virtual_cluster_id="vzw123456",
+            execution_role_arn="arn:aws:somerole",
+            release_label="6.3.0-latest",
+            job_driver={},
+            configuration_overrides={},
+            poll_interval=0,
+            client_request_token=GENERATED_UUID,
+            openlineage_inject_transport_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_transport.assert_called_once()
+        call_args = mock_submit_job.call_args.args
+        config_overrides = call_args[4]
+        assert (
+            
config_overrides["applicationConfiguration"][0]["properties"]["spark.openlineage.transport.type"]
+            == "http"
+        )
+
+    @mock.patch.object(EmrContainerHook, "submit_job")
+    @mock.patch.object(EmrContainerHook, "check_query_status", 
return_value="COMPLETED")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_transport_information_into_emr_serverless_properties"
+    )
+    def test_inject_both_parent_and_transport_info(
+        self, mock_inject_transport, mock_inject_parent, mock_check_status, 
mock_submit_job
+    ):
+        mock_inject_parent.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {"spark.openlineage.parentJobNamespace": 
"ns"},
+                }
+            ]
+        }
+        mock_inject_transport.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {
+                        **overrides.get("applicationConfiguration", 
[{}])[0].get("properties", {}),
+                        "spark.openlineage.transport.type": "http",
+                    },
+                }
+            ]
+        }
+        mock_submit_job.return_value = "job123"
+
+        operator = EmrContainerOperator(
+            task_id="start_job",
+            name="test_emr_job",
+            virtual_cluster_id="vzw123456",
+            execution_role_arn="arn:aws:somerole",
+            release_label="6.3.0-latest",
+            job_driver={},
+            configuration_overrides={},
+            poll_interval=0,
+            client_request_token=GENERATED_UUID,
+            openlineage_inject_parent_job_info=True,
+            openlineage_inject_transport_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_parent.assert_called_once()
+        mock_inject_transport.assert_called_once()
+
+    @mock.patch.object(EmrContainerHook, "submit_job")
+    @mock.patch.object(EmrContainerHook, "check_query_status", 
return_value="COMPLETED")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    def test_inject_parent_job_info_preserves_existing_config(
+        self, mock_inject_parent, mock_check_status, mock_submit_job
+    ):
+        """Existing configuration_overrides (e.g. monitoringConfiguration) are 
preserved."""
+        existing_config = {
+            "monitoringConfiguration": {
+                "cloudWatchMonitoringConfiguration": {
+                    "logGroupName": "/aws/emr-eks/jobs",
+                    "logStreamNamePrefix": "test",
+                }
+            },
+            "applicationConfiguration": [
+                {"classification": "spark-defaults", "properties": 
{"spark.driver.memory": "8G"}}
+            ],
+        }
+        mock_inject_parent.side_effect = lambda overrides, ctx: {
+            **overrides,
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {
+                        
**overrides["applicationConfiguration"][0]["properties"],
+                        "spark.openlineage.parentJobNamespace": "ns",
+                    },
+                }
+            ],
+        }
+        mock_submit_job.return_value = "job123"
+
+        operator = EmrContainerOperator(
+            task_id="start_job",
+            name="test_emr_job",
+            virtual_cluster_id="vzw123456",
+            execution_role_arn="arn:aws:somerole",
+            release_label="6.3.0-latest",
+            job_driver={},
+            configuration_overrides=existing_config,
+            poll_interval=0,
+            client_request_token=GENERATED_UUID,
+            openlineage_inject_parent_job_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        call_args = mock_submit_job.call_args.args
+        config_overrides = call_args[4]
+        # Monitoring config preserved
+        assert (
+            
config_overrides["monitoringConfiguration"]["cloudWatchMonitoringConfiguration"]["logGroupName"]
+            == "/aws/emr-eks/jobs"
+        )
+        # OL parent info injected
+        props = config_overrides["applicationConfiguration"][0]["properties"]
+        assert props["spark.openlineage.parentJobNamespace"] == "ns"
+        assert props["spark.driver.memory"] == "8G"

Reply via email to