This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 6e4abe11442 [SPARK-44747][CONNECT] Add missing SparkSession.Builder 
methods
6e4abe11442 is described below

commit 6e4abe11442f2986412909b4ebb4c13487df27b6
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Thu Aug 10 09:49:45 2023 +0900

    [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods
    
    ### What changes were proposed in this pull request?
    This PR adds a couple methods to SparkSession.Builder:
    - `conf` - this group of methods allows you to set runtime configurations 
on the Spark Connect Session.
    - `master` - this is a no-op, it is only added for compatibility.
    - `appName` - this is a no-op, it is only added for compatibility.
    - `enableHiveSupport ` - this is a no-op, it is only added for 
compatibility.
    
    ### Why are the changes needed?
    We want to maximize compatiblity with the existing API in sql/core.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. It adds a couple of builder methods.
    
    ### How was this patch tested?
    Add tests to `SparkSessionSuite` and `SparkSessionE2ESuite`.
    
    Closes #42419 from hvanhovell/SPARK-44747.
    
    Authored-by: Herman van Hovell <her...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit d27496eb3bf962981e37f989ba486d847745444f)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 91 +++++++++++++++++++++-
 .../apache/spark/sql/SparkSessionE2ESuite.scala    | 46 +++++++++++
 .../org/apache/spark/sql/SparkSessionSuite.scala   | 10 +++
 .../CheckConnectJvmClientCompatibility.scala       |  6 --
 4 files changed, 146 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7367ed153f7..e902e04e246 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -781,6 +781,7 @@ object SparkSession extends Logging {
   class Builder() extends Logging {
     private val builder = SparkConnectClient.builder()
     private var client: SparkConnectClient = _
+    private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
 
     def remote(connectionString: String): Builder = {
       builder.connectionString(connectionString)
@@ -804,6 +805,84 @@ object SparkSession extends Logging {
       this
     }
 
+    /**
+     * Sets a config option. Options set using this method are automatically 
propagated to the
+     * Spark Connect session. Only runtime options are supported.
+     *
+     * @since 3.5.0
+     */
+    def config(key: String, value: String): Builder = synchronized {
+      options += key -> value
+      this
+    }
+
+    /**
+     * Sets a config option. Options set using this method are automatically 
propagated to the
+     * Spark Connect session. Only runtime options are supported.
+     *
+     * @since 3.5.0
+     */
+    def config(key: String, value: Long): Builder = synchronized {
+      options += key -> value.toString
+      this
+    }
+
+    /**
+     * Sets a config option. Options set using this method are automatically 
propagated to the
+     * Spark Connect session. Only runtime options are supported.
+     *
+     * @since 3.5.0
+     */
+    def config(key: String, value: Double): Builder = synchronized {
+      options += key -> value.toString
+      this
+    }
+
+    /**
+     * Sets a config option. Options set using this method are automatically 
propagated to the
+     * Spark Connect session. Only runtime options are supported.
+     *
+     * @since 3.5.0
+     */
+    def config(key: String, value: Boolean): Builder = synchronized {
+      options += key -> value.toString
+      this
+    }
+
+    /**
+     * Sets a config a map of options. Options set using this method are 
automatically propagated
+     * to the Spark Connect session. Only runtime options are supported.
+     *
+     * @since 3.5.0
+     */
+    def config(map: Map[String, Any]): Builder = synchronized {
+      map.foreach { kv: (String, Any) =>
+        {
+          options += kv._1 -> kv._2.toString
+        }
+      }
+      this
+    }
+
+    /**
+     * Sets a config option. Options set using this method are automatically 
propagated to both
+     * `SparkConf` and SparkSession's own configuration.
+     *
+     * @since 3.5.0
+     */
+    def config(map: java.util.Map[String, Any]): Builder = synchronized {
+      config(map.asScala.toMap)
+    }
+
+    @deprecated("enableHiveSupport does not work in Spark Connect")
+    def enableHiveSupport(): Builder = this
+
+    @deprecated("master does not work in Spark Connect, please use remote 
instead")
+    def master(master: String): Builder = this
+
+    @deprecated("appName does not work in Spark Connect")
+    def appName(name: String): Builder = this
+
     private def tryCreateSessionFromClient(): Option[SparkSession] = {
       if (client != null) {
         Option(new SparkSession(client, cleaner, planIdGenerator))
@@ -812,6 +891,12 @@ object SparkSession extends Logging {
       }
     }
 
+    private def applyOptions(session: SparkSession): Unit = {
+      options.foreach { case (key, value) =>
+        session.conf.set(key, value)
+      }
+    }
+
     /**
      * Build the [[SparkSession]].
      *
@@ -833,6 +918,7 @@ object SparkSession extends Logging {
       val session = tryCreateSessionFromClient()
         .getOrElse(SparkSession.this.create(builder.configuration))
       setDefaultAndActiveSession(session)
+      applyOptions(session)
       session
     }
 
@@ -842,7 +928,9 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead 
of creating a new
      * session.
      *
-     * This method will update the default and/or active session if they are 
not set.
+     * This method will update the default and/or active session if they are 
not set. This method
+     * will always set the specified configuration options on the session, 
even when it is not
+     * newly created.
      *
      * @since 3.5.0
      */
@@ -850,6 +938,7 @@ object SparkSession extends Logging {
       val session = tryCreateSessionFromClient()
         .getOrElse(sessions.get(builder.configuration))
       setDefaultAndActiveSession(session)
+      applyOptions(session)
       session
     }
   }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
index 86deae982a5..490bdf9cd86 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
@@ -249,4 +249,50 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
     }
     assert(e.getMessage contains "OPERATION_CANCELED")
   }
+
+  test("option propagation") {
+    val remote = s"sc://localhost:$serverPort"
+    val session1 = SparkSession
+      .builder()
+      .remote(remote)
+      .config("foo", 12L)
+      .config("bar", value = true)
+      .config("bob", 12.0)
+      .config("heading", "north")
+      .getOrCreate()
+    assert(session1.conf.get("foo") == "12")
+    assert(session1.conf.get("bar") == "true")
+    assert(session1.conf.get("bob") == String.valueOf(12.0))
+    assert(session1.conf.get("heading") == "north")
+
+    // Check if new options are applied to an existing session.
+    val session2 = SparkSession
+      .builder()
+      .remote(remote)
+      .config("heading", "south")
+      .getOrCreate()
+    assert(session2 == session1)
+    assert(session2.conf.get("heading") == "south")
+
+    // Create a completely different session, confs are not support to leak.
+    val session3 = SparkSession
+      .builder()
+      .remote(remote)
+      .config(Map("foo" -> "13", "baar" -> "false", "heading" -> "east"))
+      .create()
+    assert(session3 != session1)
+    assert(session3.conf.get("foo") == "13")
+    assert(session3.conf.get("baar") == "false")
+    assert(session3.conf.getOption("bob").isEmpty)
+    assert(session3.conf.get("heading") == "east")
+
+    // Try to set a static conf.
+    intercept[Exception] {
+      SparkSession
+        .builder()
+        .remote(remote)
+        .config("spark.sql.globalTempDatabase", "not_gonna_happen")
+        .create()
+    }
+  }
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
index 2d7ded2d688..4aa8b4360ee 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
@@ -251,4 +251,14 @@ class SparkSessionSuite extends ConnectFunSuite {
       executor.shutdown()
     }
   }
+
+  test("deprecated methods") {
+    SparkSession
+      .builder()
+      .master("yayay")
+      .appName("bob")
+      .enableHiveSupport()
+      .create()
+      .close()
+  }
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 4439a5f3e2a..3fc02d7c397 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -223,14 +223,8 @@ object CheckConnectJvmClientCompatibility {
         "org.apache.spark.sql.SparkSession#implicits._sqlContext"),
 
       // SparkSession#Builder
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.SparkSession#Builder.appName"),
       ProblemFilters.exclude[DirectMissingMethodProblem](
         "org.apache.spark.sql.SparkSession#Builder.config"),
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.SparkSession#Builder.master"),
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.SparkSession#Builder.enableHiveSupport"),
       ProblemFilters.exclude[DirectMissingMethodProblem](
         "org.apache.spark.sql.SparkSession#Builder.withExtensions"),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to