kalluripradeep commented on code in PR #64391:
URL: https://github.com/apache/airflow/pull/64391#discussion_r3032941548


##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -545,9 +551,41 @@ def _resolve_kerberos_principal(self, principal: str | 
None) -> str:
             func = kerberos.get_kerberos_principal
         except AttributeError:
             # Fallback for older versions of Airflow
-            func = kerberos.get_kerberos_principle  # type: 
ignore[attr-defined]
+            func = getattr(kerberos, "get_kerberos_principle")
         return func(principal)
 
+    def _run_post_submit_commands(self) -> None:
+        """
+        Run any post-submit shell commands configured on this hook.
+
+        Called after the Spark job finishes (success or on_kill). Typical use 
case
+        is killing sidecars like Istio that don't shut down automatically.
+        Failures are logged as warnings and never raise.
+        """
+        for cmd in self._post_submit_commands:
+            self.log.info("Running post-submit command: %s", cmd)
+            try:
+                result = subprocess.run(
+                    cmd,
+                    shell=True,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT,
+                    text=True,
+                    check=False,
+                    timeout=30,
+                )

Review Comment:
   Correct — these commands run on the Airflow worker pod, not on the Spark 
executor pods. The primary use-case is when Airflow itself runs inside a 
Kubernetes pod that has an Istio/Envoy sidecar injected alongside the worker 
container. `localhost:15020/quitquitquit` targets that sidecar on the same pod, 
signalling it to shut down cleanly after the Spark job finishes.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -237,6 +242,7 @@ def __init__(
         self._driver_status: str | None = None
         self._spark_exit_code: int | None = None
         self._env: dict[str, Any] | None = None
+        self._post_submit_commands: list[str] = post_submit_commands or []

Review Comment:
   Fixed — now stored as `list(post_submit_commands) if post_submit_commands 
else []` so the hook keeps its own independent copy and mutations by the caller 
do not affect its behaviour.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -545,9 +551,41 @@ def _resolve_kerberos_principal(self, principal: str | 
None) -> str:
             func = kerberos.get_kerberos_principal
         except AttributeError:
             # Fallback for older versions of Airflow
-            func = kerberos.get_kerberos_principle  # type: 
ignore[attr-defined]
+            func = getattr(kerberos, "get_kerberos_principle")
         return func(principal)
 
+    def _run_post_submit_commands(self) -> None:
+        """
+        Run any post-submit shell commands configured on this hook.
+
+        Called after the Spark job finishes (success or on_kill). Typical use 
case
+        is killing sidecars like Istio that don't shut down automatically.
+        Failures are logged as warnings and never raise.
+        """
+        for cmd in self._post_submit_commands:
+            self.log.info("Running post-submit command: %s", cmd)
+            try:
+                result = subprocess.run(
+                    cmd,
+                    shell=True,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT,
+                    text=True,
+                    check=False,
+                    timeout=30,
+                )

Review Comment:
   Fixed — replaced `shell=True` with `shell=False` and `shlex.split(cmd)` to 
tokenise the command string safely without spawning a shell process.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -545,9 +551,41 @@ def _resolve_kerberos_principal(self, principal: str | 
None) -> str:
             func = kerberos.get_kerberos_principal
         except AttributeError:
             # Fallback for older versions of Airflow
-            func = kerberos.get_kerberos_principle  # type: 
ignore[attr-defined]
+            func = getattr(kerberos, "get_kerberos_principle")
         return func(principal)
 
+    def _run_post_submit_commands(self) -> None:
+        """
+        Run any post-submit shell commands configured on this hook.
+
+        Called after the Spark job finishes (success or on_kill). Typical use 
case
+        is killing sidecars like Istio that don't shut down automatically.
+        Failures are logged as warnings and never raise.
+        """
+        for cmd in self._post_submit_commands:
+            self.log.info("Running post-submit command: %s", cmd)
+            try:
+                result = subprocess.run(
+                    cmd,
+                    shell=True,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT,
+                    text=True,
+                    check=False,
+                    timeout=30,
+                )
+                self.log.info("Post-submit command output:\n%s", result.stdout)
+                if result.returncode != 0:

Review Comment:
   Fixed — logging is now at `DEBUG` level and stdout output is truncated to 
the first 2,000 characters to keep task logs clean.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -605,6 +643,7 @@ def submit(self, application: str = "", **kwargs: Any) -> 
None:
                 raise AirflowException(
                     f"ERROR : Driver {self._driver_id} badly exited with 
status {self._driver_status}"
                 )
+        self._run_post_submit_commands()

Review Comment:
   Fixed — the call is now inside a `try/finally` block so 
`_run_post_submit_commands()` always runs regardless of whether an 
`AirflowException` is raised on the success or failure path.



##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit_post_commands.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Tests for the post_submit_commands feature added to SparkSubmitHook.
+Issue: https://github.com/apache/airflow/issues/50958
+"""
+
+from __future__ import annotations
+
+import subprocess
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
+
+
+def _make_hook(post_submit_commands=None, **extra):
+    """Build a SparkSubmitHook with a mocked connection."""
+
+    with (
+        patch.object(SparkSubmitHook, "_resolve_connection") as mock_conn,
+        patch.object(SparkSubmitHook, "_resolve_should_track_driver_status", 
return_value=False),
+    ):
+        mock_conn.return_value = {
+            "master": "local",
+            "queue": None,
+            "deploy_mode": "client",
+            "spark_binary": "spark-submit",
+            "namespace": None,
+            "principal": None,
+            "keytab": None,
+        }
+        hook = SparkSubmitHook(
+            conn_id="spark_default",
+            post_submit_commands=post_submit_commands,
+            **extra,
+        )
+    return hook
+
+
+class TestNoPostSubmitCommands:
+    def test_defaults_to_empty_list(self):
+        hook = _make_hook()
+        assert hook._post_submit_commands == []
+
+    def test_run_post_submit_commands_is_noop_when_empty(self):
+        hook = _make_hook()
+        with patch("subprocess.run") as mock_run:
+            hook._run_post_submit_commands()
+            mock_run.assert_not_called()
+
+
+class TestSingleCommand:
+    def test_command_is_stored(self):
+        hook = _make_hook(post_submit_commands=["echo hello"])
+        assert hook._post_submit_commands == ["echo hello"]
+
+    def test_command_is_executed_via_shell(self):
+        hook = _make_hook(post_submit_commands=["echo hello"])
+        mock_result = MagicMock()
+        mock_result.returncode = 0
+        mock_result.stdout = "hello\n"
+

Review Comment:
   Fixed — all new subprocess mocks now use 
`MagicMock(spec=subprocess.CompletedProcess)` so any access to an attribute 
that doesn't exist on `CompletedProcess` raises `AttributeError` rather than 
silently returning another mock.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -94,6 +94,10 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                         (will overwrite any deployment mode defined in the 
connection's extra JSON)
     :param use_krb5ccache: if True, configure spark to use ticket cache 
instead of relying
         on keytab for Kerberos login
+    :param post_submit_commands: Optional list of shell commands to run after 
the Spark
+        job finishes (on both success and on_kill). Useful for cleaning up 
sidecars such
+        as Istio (e.g. ``["curl -X POST localhost:15020/quitquitquit"]``). 
Each command
+        is executed via the shell; failures produce a warning but do not fail 
the task.

Review Comment:
   Fixed — `post_submit_commands=self._post_submit_commands` is now passed 
through in `SparkSubmitOperator._get_hook()`, and the parameter is wired 
through `__init__` and the operator's docstring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to