This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new ae1a5c9b86f [SPARK-44636][CONNECT] Leave no dangling iterators
ae1a5c9b86f is described below

commit ae1a5c9b86f81dfed7471bda6db48f04eb6906ae
Author: Alice Sayutina <alice.sayut...@databricks.com>
AuthorDate: Wed Aug 2 17:41:47 2023 -0400

    [SPARK-44636][CONNECT] Leave no dangling iterators
    
    ### What changes were proposed in this pull request?
    Minorly refactored execute functions to not leave dangling iterators
    
    (Note: we also should do that with SparkResult, however in almost all cases 
there should be no problem with iterators not consumed).
    
    ### Why are the changes needed?
    
    Needed for ongoing work regarding session reattachment.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    This is intended to be tested after session reattachment is complete (cc 
juliuszsompolski).
    
    Closes #42298 from cdkrot/dangling_iterators.
    
    Lead-authored-by: Alice Sayutina <alice.sayut...@databricks.com>
    Co-authored-by: Alice Sayutina <cdkr...@gmail.com>
    Co-authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 784f1d0da7f9d96bbc8ab2dda9d9556691012e17)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 25 +++++++++++++---------
 .../sql/connect/client/SparkConnectClient.scala    |  6 ++++++
 .../connect/client/SparkConnectClientSuite.scala   | 19 ++++++++++++++++
 3 files changed, 40 insertions(+), 10 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a3d82156a03..59f3f3526ab 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -252,10 +252,12 @@ class SparkSession private[sql] (
           .setSql(sqlText)
           .addAllPosArgs(args.map(toLiteralProto).toIterable.asJava)))
     val plan = proto.Plan.newBuilder().setCommand(cmd)
-    val responseIter = client.execute(plan.build())
+    val responseSeq = client.execute(plan.build()).asScala.toSeq
 
-    // Note: .toSeq makes the stream be consumed and closed.
-    val response = responseIter.asScala.toSeq
+    // sequence is a lazy stream, force materialize it to make sure it is 
consumed.
+    responseSeq.foreach(_ => ())
+
+    val response = responseSeq
       .find(_.hasSqlCommandResult)
       .getOrElse(throw new RuntimeException("SQLCommandResult must be 
present"))
 
@@ -309,10 +311,12 @@ class SparkSession private[sql] (
             .setSql(sqlText)
             .putAllArgs(args.asScala.mapValues(toLiteralProto).toMap.asJava)))
       val plan = proto.Plan.newBuilder().setCommand(cmd)
-      val responseIter = client.execute(plan.build())
+      val responseSeq = client.execute(plan.build()).asScala.toSeq
+
+      // sequence is a lazy stream, force materialize it to make sure it is 
consumed.
+      responseSeq.foreach(_ => ())
 
-      // Note: .toSeq makes the stream be consumed and closed.
-      val response = responseIter.asScala.toSeq
+      val response = responseSeq
         .find(_.hasSqlCommandResult)
         .getOrElse(throw new RuntimeException("SQLCommandResult must be 
present"))
 
@@ -549,14 +553,15 @@ class SparkSession private[sql] (
 
   private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = 
{
     val plan = proto.Plan.newBuilder().setCommand(command).build()
-    client.execute(plan).asScala.toSeq
+    val seq = client.execute(plan).asScala.toSeq
+    // sequence is a lazy stream, force materialize it to make sure it is 
consumed.
+    seq.foreach(_ => ())
+    seq
   }
 
   private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction): 
Unit = {
     val command = proto.Command.newBuilder().setRegisterFunction(udf).build()
-    val plan = proto.Plan.newBuilder().setCommand(command).build()
-
-    client.execute(plan).asScala.foreach(_ => ())
+    execute(command)
   }
 
   @DeveloperApi
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index aac5e6b9cc3..3d20be88888 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -72,6 +72,12 @@ private[sql] class SparkConnectClient(
     bstub.analyzePlan(request)
   }
 
+  /**
+   * Execute the plan and return response iterator.
+   *
+   * It returns an open iterator. The caller needs to ensure that this 
iterator is fully consumed,
+   * otherwise resources held by a re-attachable query may be left dangling 
until server timeout.
+   */
   def execute(plan: proto.Plan): java.util.Iterator[proto.ExecutePlanResponse] 
= {
     artifactManager.uploadAllClassFileArtifacts()
     val request = proto.ExecutePlanRequest
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index ec447cac869..3436037809d 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.connect.client
 
+import java.util.UUID
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
@@ -292,12 +293,30 @@ class DummySparkConnectService() extends 
SparkConnectServiceGrpc.SparkConnectSer
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
     // Reply with a dummy response using the same client ID
     val requestSessionId = request.getSessionId
+    val operationId = if (request.hasOperationId) {
+      request.getOperationId
+    } else {
+      UUID.randomUUID().toString
+    }
     inputPlan = request.getPlan
     val response = ExecutePlanResponse
       .newBuilder()
       .setSessionId(requestSessionId)
+      .setOperationId(operationId)
       .build()
     responseObserver.onNext(response)
+    // Reattachable execute must end with ResultComplete
+    if (request.getRequestOptionsList.asScala.exists { option =>
+        option.hasReattachOptions && option.getReattachOptions.getReattachable 
== true
+      }) {
+      val resultComplete = ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(requestSessionId)
+        .setOperationId(operationId)
+        
.setResultComplete(proto.ExecutePlanResponse.ResultComplete.newBuilder().build())
+        .build()
+      responseObserver.onNext(resultComplete)
+    }
     responseObserver.onCompleted()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to