This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b6993cbbcaa0 [SPARK-53516][SDP] Fix `spark.api.mode` arg process in
SparkPipelines
b6993cbbcaa0 is described below
commit b6993cbbcaa0b175019115c665fc081c3d65a7ed
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Sep 23 08:58:53 2025 -0700
[SPARK-53516][SDP] Fix `spark.api.mode` arg process in SparkPipelines
### What changes were proposed in this pull request?
This PR fixes two issues:
- Trim the value of `spark.api.mode` before evaluation
- The value of `spark.api.mode` should be case insensitive
- Support both `-c spark.api.mode=xxx` and `--conf spark.api.mode=xxx`
- Avoid duplicated `--conf spark.api.mode=connect` args in the final
generated commands
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No, SDP is an unreleased feature.
### How was this patch tested?
UT is added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52261 from pan3793/SPARK-53516.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
.../org/apache/spark/deploy/SparkPipelines.scala | 23 ++++----
.../apache/spark/deploy/SparkPipelinesSuite.scala | 66 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
index 8c96598e7a9d..713937cadabf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
@@ -18,12 +18,14 @@
package org.apache.spark.deploy
import java.util
+import java.util.Locale
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import org.apache.spark.SparkUserAppException
import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkLauncher.SPARK_API_MODE
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.util.SparkExitCode
@@ -63,16 +65,17 @@ object SparkPipelines extends Logging {
if (opt == "--remote") {
remote = value
} else if (opt == "--class") {
- logInfo("--class argument not supported.")
- throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
- } else if (opt == "--conf" &&
- value.startsWith("spark.api.mode=") &&
- value != "spark.api.mode=connect") {
- logInfo(
- "--spark.api.mode must be 'connect'. " +
- "Declarative Pipelines currently only supports Spark Connect."
- )
+ logError("--class argument not supported.")
throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
+ } else if ((opt == "--conf" || opt == "-c") &&
value.startsWith(s"$SPARK_API_MODE=")) {
+ val apiMode = value.stripPrefix(s"$SPARK_API_MODE=").trim
+ if (apiMode.toLowerCase(Locale.ROOT) != "connect") {
+ logError(
+ s"$SPARK_API_MODE must be 'connect' (was '$apiMode'). " +
+ "Declarative Pipelines currently only supports Spark Connect."
+ )
+ throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
+ }
} else if (Seq("--name", "-h", "--help").contains(opt)) {
pipelinesArgs += opt
if (value != null && value.nonEmpty) {
@@ -99,7 +102,7 @@ object SparkPipelines extends Logging {
}
sparkSubmitArgs += "--conf"
- sparkSubmitArgs += "spark.api.mode=connect"
+ sparkSubmitArgs += s"$SPARK_API_MODE=connect"
sparkSubmitArgs += "--remote"
sparkSubmitArgs += remote
(sparkSubmitArgs.toSeq, pipelinesArgs.toSeq)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
index 2d6a1e083604..55f59d7c856d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
@@ -117,6 +117,72 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils
with BeforeAndAfterEach {
}
}
+ test("spark.api.mode arg") {
+ var args = Array("--conf", "spark.api.mode=classic")
+ intercept[SparkUserAppException] {
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
+ }
+ args = Array("-c", "spark.api.mode=classic")
+ intercept[SparkUserAppException] {
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
+ }
+ args = Array("--conf", "spark.api.mode=CONNECT")
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py"
+ )
+ )
+ args = Array("--conf", "spark.api.mode=CoNNect")
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py"
+ )
+ )
+ args = Array("--conf", "spark.api.mode=connect")
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py"
+ )
+ )
+ args = Array("--conf", "spark.api.mode= connect")
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py"
+ )
+ )
+ args = Array("-c", "spark.api.mode=connect")
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py"
+ )
+ )
+ }
+
test("name arg") {
val args = Array(
"init",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]