This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new ad2c1449803 [SPARK-42667][CONNECT] Spark Connect: newSession API
ad2c1449803 is described below

commit ad2c1449803567462f3a2a19f71b36186f2dad44
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Fri Mar 3 21:45:18 2023 -0400

    [SPARK-42667][CONNECT] Spark Connect: newSession API
    
    ### What changes were proposed in this pull request?
    
    This PR proposes an implementation of newSession API. The idea is we reuse 
user context(e.g. user_id), gRPC channel, etc. But differentiate different 
Spark Remote Session by client id, which is generated randomly.
    
    So this idea has the benefits of:
    1. reusing gRPC channel to not over too manny connections to the server.
    2. Each user can has multiple remote sessions, differentiated by client ids 
(or named session ids in server side).
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    ### How was this patch tested?
    
    UT
    
    Closes #40272 from amaliujia/new_session.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 00b7ca3074094822b3b5b3da1b292c6d25dca220)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala      | 2 +-
 .../org/apache/spark/sql/connect/client/SparkConnectClient.scala    | 4 ++++
 .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala    | 6 ++++++
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a8a88d63b1a..2b032b7cc8a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -344,7 +344,7 @@ class SparkSession private[sql] (
   // scalastyle:on
 
   def newSession(): SparkSession = {
-    throw new UnsupportedOperationException("newSession is not supported")
+    SparkSession.builder().client(client.copy()).build()
   }
 
   private def range(
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 599aab441de..8828a4a87e6 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -149,6 +149,10 @@ private[sql] class SparkConnectClient(
     analyze(request)
   }
 
+  def copy(): SparkConnectClient = {
+    new SparkConnectClient(userContext, channel, userAgent)
+  }
+
   /**
    * Add a single artifact to the client session.
    *
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index ffbf3cee025..a3f1de55892 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -586,6 +586,12 @@ class ClientE2ETestSuite extends RemoteSparkSession {
       list.asScala.map(kv => Row(kv.key, kv.value)),
       session.createDataFrame(list.asScala.toSeq))
   }
+
+  test("SparkSession newSession") {
+    val oldId = spark.sql("SELECT 1").analyze.getClientId
+    val newId = spark.newSession().sql("SELECT 1").analyze.getClientId
+    assert(oldId != newId)
+  }
 }
 
 private[sql] case class MyType(id: Long, a: Double, b: Double)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to