Copilot commented on code in PR #64391:
URL: https://github.com/apache/airflow/pull/64391#discussion_r3025334196
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -237,6 +242,7 @@ def __init__(
self._driver_status: str | None = None
self._spark_exit_code: int | None = None
self._env: dict[str, Any] | None = None
+ self._post_submit_commands: list[str] = post_submit_commands or []
Review Comment:
`post_submit_commands` is stored by reference (`post_submit_commands or
[]`). If the caller mutates the list after hook construction, the hook’s
behavior changes unexpectedly. Consider copying/coercing to a new list (e.g.,
`list(post_submit_commands)` when provided) and potentially validating the
element types (strings).
```suggestion
self._post_submit_commands: list[str] = (
list(post_submit_commands) if post_submit_commands is not None
else []
)
```
##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit_post_commands.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Tests for the post_submit_commands feature added to SparkSubmitHook.
+Issue: https://github.com/apache/airflow/issues/50958
+"""
+
+from __future__ import annotations
+
+import subprocess
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
+
+
+def _make_hook(post_submit_commands=None, **extra):
+ """Build a SparkSubmitHook with a mocked connection."""
+
+ with (
+ patch.object(SparkSubmitHook, "_resolve_connection") as mock_conn,
+ patch.object(SparkSubmitHook, "_resolve_should_track_driver_status",
return_value=False),
+ ):
+ mock_conn.return_value = {
+ "master": "local",
+ "queue": None,
+ "deploy_mode": "client",
+ "spark_binary": "spark-submit",
+ "namespace": None,
+ "principal": None,
+ "keytab": None,
+ }
+ hook = SparkSubmitHook(
+ conn_id="spark_default",
+ post_submit_commands=post_submit_commands,
+ **extra,
+ )
+ return hook
+
+
+class TestNoPostSubmitCommands:
+ def test_defaults_to_empty_list(self):
+ hook = _make_hook()
+ assert hook._post_submit_commands == []
+
+ def test_run_post_submit_commands_is_noop_when_empty(self):
+ hook = _make_hook()
+ with patch("subprocess.run") as mock_run:
+ hook._run_post_submit_commands()
+ mock_run.assert_not_called()
+
+
+class TestSingleCommand:
+ def test_command_is_stored(self):
+ hook = _make_hook(post_submit_commands=["echo hello"])
+ assert hook._post_submit_commands == ["echo hello"]
+
+ def test_command_is_executed_via_shell(self):
+ hook = _make_hook(post_submit_commands=["echo hello"])
+ mock_result = MagicMock()
+ mock_result.returncode = 0
+ mock_result.stdout = "hello\n"
+
Review Comment:
Tests rely heavily on bare `MagicMock()` instances for
`CompletedProcess`/Popen-like objects. Using real `subprocess.CompletedProcess`
(or `MagicMock(spec=subprocess.CompletedProcess)` / autospecced mocks) would
better match the real API and reduce the chance of tests passing with incorrect
attributes/behaviors.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -545,9 +551,41 @@ def _resolve_kerberos_principal(self, principal: str |
None) -> str:
func = kerberos.get_kerberos_principal
except AttributeError:
# Fallback for older versions of Airflow
- func = kerberos.get_kerberos_principle # type:
ignore[attr-defined]
+ func = getattr(kerberos, "get_kerberos_principle")
return func(principal)
+ def _run_post_submit_commands(self) -> None:
+ """
+ Run any post-submit shell commands configured on this hook.
+
+ Called after the Spark job finishes (success or on_kill). Typical use
case
+ is killing sidecars like Istio that don't shut down automatically.
+ Failures are logged as warnings and never raise.
+ """
+ for cmd in self._post_submit_commands:
+ self.log.info("Running post-submit command: %s", cmd)
+ try:
+ result = subprocess.run(
+ cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ check=False,
+ timeout=30,
+ )
+ self.log.info("Post-submit command output:\n%s", result.stdout)
+ if result.returncode != 0:
Review Comment:
`_run_post_submit_commands()` logs the full command and its full stdout at
INFO. Post-submit commands may include secrets (e.g., headers/tokens) and
stdout can be large/noisy. Consider logging at DEBUG, truncating output, and/or
redacting sensitive values to reduce accidental leakage and log volume.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -605,6 +643,7 @@ def submit(self, application: str = "", **kwargs: Any) ->
None:
raise AirflowException(
f"ERROR : Driver {self._driver_id} badly exited with
status {self._driver_status}"
)
+ self._run_post_submit_commands()
Review Comment:
`submit()` calls `_run_post_submit_commands()` only on the success path. Any
`AirflowException` raised for non-zero return codes or unsuccessful driver
status will bypass the post-submit cleanup entirely. If the intent is “run
after the Spark job finishes” regardless of success/failure, wrap the body in
`try/finally` (ensuring the original exception still propagates); otherwise
clarify the docstring/param docs that commands run only after successful submit
and on_kill.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -545,9 +551,41 @@ def _resolve_kerberos_principal(self, principal: str |
None) -> str:
func = kerberos.get_kerberos_principal
except AttributeError:
# Fallback for older versions of Airflow
- func = kerberos.get_kerberos_principle # type:
ignore[attr-defined]
+ func = getattr(kerberos, "get_kerberos_principle")
return func(principal)
+ def _run_post_submit_commands(self) -> None:
+ """
+ Run any post-submit shell commands configured on this hook.
+
+ Called after the Spark job finishes (success or on_kill). Typical use
case
+ is killing sidecars like Istio that don't shut down automatically.
+ Failures are logged as warnings and never raise.
+ """
+ for cmd in self._post_submit_commands:
+ self.log.info("Running post-submit command: %s", cmd)
+ try:
+ result = subprocess.run(
+ cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ check=False,
+ timeout=30,
+ )
Review Comment:
Running arbitrary strings with `subprocess.run(..., shell=True)` introduces
avoidable shell-injection risk and makes quoting/escaping behavior
shell-dependent. If possible, run commands with `shell=False` (e.g., accept
`Sequence[str]` argv or parse strings with `shlex.split`) and only opt into
`shell=True` when explicitly requested.
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -94,6 +94,10 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
(will overwrite any deployment mode defined in the
connection's extra JSON)
:param use_krb5ccache: if True, configure spark to use ticket cache
instead of relying
on keytab for Kerberos login
+ :param post_submit_commands: Optional list of shell commands to run after
the Spark
+ job finishes (on both success and on_kill). Useful for cleaning up
sidecars such
+ as Istio (e.g. ``["curl -X POST localhost:15020/quitquitquit"]``).
Each command
+ is executed via the shell; failures produce a warning but do not fail
the task.
Review Comment:
`post_submit_commands` is added to `SparkSubmitHook`, but
`SparkSubmitOperator._get_hook()` currently doesn’t pass or expose this
parameter, so DAG authors using the operator can’t configure the new behavior.
Consider adding a matching operator argument and wiring it through to the hook.
##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit_post_commands.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Tests for the post_submit_commands feature added to SparkSubmitHook.
+Issue: https://github.com/apache/airflow/issues/50958
+"""
+
Review Comment:
The module docstring includes a direct issue link/reference. In the Spark
provider unit tests, module docstrings generally describe behavior without
linking to tracking issues (e.g., `test_spark_submit.py:1-35`). Consider
removing the issue reference to keep tests focused on behavior rather than
ticket tracking.
```suggestion
Tests behavior of the post_submit_commands feature in SparkSubmitHook.
"""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]