This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 917bc8cb927 [SPARK-45360][SQL][CONNECT] Initialize spark session 
builder configuration from SPARK_REMOTE
917bc8cb927 is described below

commit 917bc8cb92728267fb93891f4ef9da13c06e4589
Author: Yihong He <yihong...@databricks.com>
AuthorDate: Thu Sep 28 12:58:07 2023 -0400

    [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration 
from SPARK_REMOTE
    
    ### What changes were proposed in this pull request?
    
    - Initialize spark session builder configuration from SPARK_REMOTE
    
    ### Why are the changes needed?
    
    - `SparkSession.builder().getOrCreate()` should follow the behavior 
documents 
[here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244)
 and support initialization from SPARK_REMOTE
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    - `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #43153 from heyihong/SPARK-45360.
    
    Authored-by: Yihong He <yihong...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  5 +-
 .../connect/client/SparkConnectClientSuite.scala   | 61 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7bd8fa59aea..421f37b9e8a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -783,7 +783,10 @@ object SparkSession extends Logging {
   }
 
   class Builder() extends Logging {
-    private val builder = SparkConnectClient.builder()
+    // Initialize the connection string of the Spark Connect client builder 
from SPARK_REMOTE
+    // by default, if it exists. The connection string can be overridden using
+    // the remote() function, as it takes precedence over the SPARK_REMOTE 
environment variable.
+    private val builder = SparkConnectClient.builder().loadFromEnvironment()
     private var client: SparkConnectClient = _
     private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 80e245ec78b..89acc2c60ac 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     assert(response.getSessionId === "abc123")
   }
 
+  private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
+    val readonlyEnv = System.getenv()
+    val field = readonlyEnv.getClass.getDeclaredField("m")
+    field.setAccessible(true)
+    val modifiableEnv = 
field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
+    try {
+      for ((k, v) <- pairs) {
+        assert(!modifiableEnv.containsKey(k))
+        modifiableEnv.put(k, v)
+      }
+      f
+    } finally {
+      for ((k, _) <- pairs) {
+        modifiableEnv.remove(k)
+      }
+    }
+  }
+
   test("Test connection") {
     testClientConnection() { testPort => 
SparkConnectClient.builder().port(testPort).build() }
   }
@@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     }
   }
 
+  test("SparkSession create with SPARK_REMOTE") {
+    startDummyServer(0)
+
+    withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+      val session = SparkSession.builder().create()
+      val df = session.range(10)
+      df.analyze // Trigger RPC
+      assert(df.plan === service.getAndClearLatestInputPlan())
+
+      val session2 = SparkSession.builder().create()
+      assert(session != session2)
+    }
+  }
+
+  test("SparkSession getOrCreate with SPARK_REMOTE") {
+    startDummyServer(0)
+
+    withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+      val session = SparkSession.builder().getOrCreate()
+
+      val df = session.range(10)
+      df.analyze // Trigger RPC
+      assert(df.plan === service.getAndClearLatestInputPlan())
+
+      val session2 = SparkSession.builder().getOrCreate()
+      assert(session === session2)
+    }
+  }
+
+  test("Builder.remote takes precedence over SPARK_REMOTE") {
+    startDummyServer(0)
+    val incorrectUrl = s"sc://localhost:${server.getPort + 1}"
+
+    withEnvs("SPARK_REMOTE" -> incorrectUrl) {
+      val session =
+        
SparkSession.builder().remote(s"sc://localhost:${server.getPort}").getOrCreate()
+
+      val df = session.range(10)
+      df.analyze // Trigger RPC
+      assert(df.plan === service.getAndClearLatestInputPlan())
+    }
+  }
+
   test("SparkSession initialisation with connection string") {
     startDummyServer(0)
     client = SparkConnectClient


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to