This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 183a3d761f3 [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE 183a3d761f3 is described below commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed Author: Yihong He <yihong...@databricks.com> AuthorDate: Thu Sep 28 12:58:07 2023 -0400 [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE ### What changes were proposed in this pull request? - Initialize spark session builder configuration from SPARK_REMOTE ### Why are the changes needed? - `SparkSession.builder().getOrCreate()` should follow the behavior documents [here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244) and support initialization from SPARK_REMOTE ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"` ### Was this patch authored or co-authored using generative AI tooling? Closes #43153 from heyihong/SPARK-45360. Authored-by: Yihong He <yihong...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../scala/org/apache/spark/sql/SparkSession.scala | 5 +- .../connect/client/SparkConnectClientSuite.scala | 61 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index abe4d6a96e6..42052e3f8e6 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -786,7 +786,10 @@ object SparkSession extends Logging { } class Builder() extends Logging { - private val builder = SparkConnectClient.builder() + // Initialize the connection string of the Spark Connect client builder from SPARK_REMOTE + // by default, if it exists. The connection string can be overridden using + // the remote() function, as it takes precedence over the SPARK_REMOTE environment variable. + private val builder = SparkConnectClient.builder().loadFromEnvironment() private var client: SparkConnectClient = _ private[this] val options = new scala.collection.mutable.HashMap[String, String] diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index 488118d0552..57e0b4016f1 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { assert(response.getSessionId === "abc123") } + private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { + val readonlyEnv = System.getenv() + val field = readonlyEnv.getClass.getDeclaredField("m") + field.setAccessible(true) + val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]] + try { + for ((k, v) <- pairs) { + assert(!modifiableEnv.containsKey(k)) + modifiableEnv.put(k, v) + } + f + } finally { + for ((k, _) <- pairs) { + modifiableEnv.remove(k) + } + } + } + test("Test connection") { testClientConnection() { testPort => SparkConnectClient.builder().port(testPort).build() } } @@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { } } + test("SparkSession create with SPARK_REMOTE") { + startDummyServer(0) + + withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") { + val session = SparkSession.builder().create() + val df = session.range(10) + df.analyze // Trigger RPC + assert(df.plan === service.getAndClearLatestInputPlan()) + + val session2 = SparkSession.builder().create() + assert(session != session2) + } + } + + test("SparkSession getOrCreate with SPARK_REMOTE") { + startDummyServer(0) + + withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") { + val session = SparkSession.builder().getOrCreate() + + val df = session.range(10) + df.analyze // Trigger RPC + assert(df.plan === service.getAndClearLatestInputPlan()) + + val session2 = SparkSession.builder().getOrCreate() + assert(session === session2) + } + } + + test("Builder.remote takes precedence over SPARK_REMOTE") { + startDummyServer(0) + val incorrectUrl = s"sc://localhost:${server.getPort + 1}" + + withEnvs("SPARK_REMOTE" -> incorrectUrl) { + val session = + SparkSession.builder().remote(s"sc://localhost:${server.getPort}").getOrCreate() + + val df = session.range(10) + df.analyze // Trigger RPC + assert(df.plan === service.getAndClearLatestInputPlan()) + } + } + test("SparkSession initialisation with connection string") { startDummyServer(0) client = SparkConnectClient --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org