This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d18305d5e93 [SPARK-44656][CONNECT] Make all iterators 
CloseableIterators
d18305d5e93 is described below

commit d18305d5e9312a438317b9b2ff800f2c074e3917
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Aug 4 16:06:57 2023 +0200

    [SPARK-44656][CONNECT] Make all iterators CloseableIterators
    
    ### What changes were proposed in this pull request?
    
    This makes sure that all iterators used in Spark Connect scala client are 
`CloseableIterator`.
    
    1. Makes `CustomSparkConnectBlockingStub.executePlan` return 
`CloseableIterator` and make all wrappers respect that.
    2. Makes `ExecutePlanResponseReattachableIterator` a `CloseableIterator`, 
with an implementation that will inform the server that query result can be 
released with ReleaseExecute.
    3. Makes `SparkResult.iterator` explicitly a `CloseableIterator`, and also 
register the `SparkResult.responses` iterator as with the 
`SparkResultCloseable` cleaner, which will make it close upon GC, if not closed 
explicitly sooner.
    4. Because `Dataset.toLocalIterator` requires a Java iterator, implement a 
conversion to `java.util.Iterator with AutoCloseable` to be returned there
    5. Using `CloseableIterator` consistently everywhere else removes the need 
to convert between iterator types.
    
    ### Why are the changes needed?
    
    Properly closeable iterators are needed for resource management, and with 
reattachable execution to inform server that processing finished.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Exercise current E2E tests.
    
    Co-authored-by: Alice Sayutina <alice.sayutinadatabricks.com>
    
    Closes #42331 from juliuszsompolski/closeable_iterators.
    
    Lead-authored-by: Juliusz Sompolski <ju...@databricks.com>
    Co-authored-by: Alice Sayutina <alice.sayut...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 84ea6f242e4982187edc0a8f5786e7dc69ec31d7)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  4 +-
 .../scala/org/apache/spark/sql/SparkSession.scala  | 21 ++++------
 .../sql/connect/client/CloseableIterator.scala     | 46 ++++++++++++++++++++++
 .../client/CustomSparkConnectBlockingStub.scala    | 10 +++--
 .../ExecutePlanResponseReattachableIterator.scala  | 34 +++++++++-------
 .../connect/client/GrpcExceptionConverter.scala    | 10 ++++-
 .../sql/connect/client/GrpcRetryHandler.scala      | 24 ++++++-----
 .../sql/connect/client/SparkConnectClient.scala    |  7 ++--
 .../spark/sql/connect/client/SparkResult.scala     | 37 +++++++++--------
 .../connect/client/arrow/ArrowDeserializer.scala   |  1 +
 .../connect/client/arrow/ArrowEncoderUtils.scala   |  2 -
 .../sql/connect/client/arrow/ArrowSerializer.scala |  1 +
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  2 +-
 .../connect/client/arrow/ArrowEncoderSuite.scala   |  1 +
 14 files changed, 133 insertions(+), 67 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0f7b376955c..8a7dce3987a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2832,7 +2832,7 @@ class Dataset[T] private[sql] (
   /**
    * Returns an iterator that contains all rows in this Dataset.
    *
-   * The returned iterator implements [[AutoCloseable]]. For memory management 
it is better to
+   * The returned iterator implements [[AutoCloseable]]. For resource 
management it is better to
    * close it once you are done. If you don't close it, it and the underlying 
data will be cleaned
    * up once the iterator is garbage collected.
    *
@@ -2840,7 +2840,7 @@ class Dataset[T] private[sql] (
    * @since 3.4.0
    */
   def toLocalIterator(): java.util.Iterator[T] = {
-    collectResult().destructiveIterator
+    collectResult().destructiveIterator.asJava
   }
 
   /**
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 59f3f3526ab..355d7edadc7 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -252,10 +252,8 @@ class SparkSession private[sql] (
           .setSql(sqlText)
           .addAllPosArgs(args.map(toLiteralProto).toIterable.asJava)))
     val plan = proto.Plan.newBuilder().setCommand(cmd)
-    val responseSeq = client.execute(plan.build()).asScala.toSeq
-
-    // sequence is a lazy stream, force materialize it to make sure it is 
consumed.
-    responseSeq.foreach(_ => ())
+    // .toBuffer forces that the iterator is consumed and closed
+    val responseSeq = client.execute(plan.build()).toBuffer.toSeq
 
     val response = responseSeq
       .find(_.hasSqlCommandResult)
@@ -311,10 +309,8 @@ class SparkSession private[sql] (
             .setSql(sqlText)
             .putAllArgs(args.asScala.mapValues(toLiteralProto).toMap.asJava)))
       val plan = proto.Plan.newBuilder().setCommand(cmd)
-      val responseSeq = client.execute(plan.build()).asScala.toSeq
-
-      // sequence is a lazy stream, force materialize it to make sure it is 
consumed.
-      responseSeq.foreach(_ => ())
+      // .toBuffer forces that the iterator is consumed and closed
+      val responseSeq = client.execute(plan.build()).toBuffer.toSeq
 
       val response = responseSeq
         .find(_.hasSqlCommandResult)
@@ -548,15 +544,14 @@ class SparkSession private[sql] (
     f(builder)
     builder.getCommonBuilder.setPlanId(planIdGenerator.getAndIncrement())
     val plan = proto.Plan.newBuilder().setRoot(builder).build()
-    client.execute(plan).asScala.foreach(_ => ())
+    // .toBuffer forces that the iterator is consumed and closed
+    client.execute(plan).toBuffer
   }
 
   private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = 
{
     val plan = proto.Plan.newBuilder().setCommand(command).build()
-    val seq = client.execute(plan).asScala.toSeq
-    // sequence is a lazy stream, force materialize it to make sure it is 
consumed.
-    seq.foreach(_ => ())
-    seq
+    // .toBuffer forces that the iterator is consumed and closed
+    client.execute(plan).toBuffer.toSeq
   }
 
   private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction): 
Unit = {
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala
new file mode 100644
index 00000000000..891e50ed6e7
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+private[sql] trait CloseableIterator[E] extends Iterator[E] with AutoCloseable 
{ self =>
+  def asJava: java.util.Iterator[E] = new java.util.Iterator[E] with 
AutoCloseable {
+    override def next() = self.next()
+
+    override def hasNext() = self.hasNext
+
+    override def close() = self.close()
+  }
+}
+
+private[sql] object CloseableIterator {
+
+  /**
+   * Wrap iterator to get CloseeableIterator, if it wasn't closeable already.
+   */
+  def apply[T](iterator: Iterator[T]): CloseableIterator[T] = iterator match {
+    case closeable: CloseableIterator[T] => closeable
+    case _ =>
+      new CloseableIterator[T] {
+        override def next(): T = iterator.next()
+
+        override def hasNext(): Boolean = iterator.hasNext
+
+        override def close() = { /* empty */ }
+      }
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala
index bb20901eade..73ff01e223f 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.connect.client
 
+import scala.collection.JavaConverters._
+
 import io.grpc.ManagedChannel
 
 import org.apache.spark.connect.proto._
@@ -27,15 +29,17 @@ private[client] class CustomSparkConnectBlockingStub(
   private val stub = SparkConnectServiceGrpc.newBlockingStub(channel)
   private val retryHandler = new GrpcRetryHandler(retryPolicy)
 
-  def executePlan(request: ExecutePlanRequest): 
java.util.Iterator[ExecutePlanResponse] = {
+  def executePlan(request: ExecutePlanRequest): 
CloseableIterator[ExecutePlanResponse] = {
     GrpcExceptionConverter.convert {
       GrpcExceptionConverter.convertIterator[ExecutePlanResponse](
-        retryHandler.RetryIterator(request, stub.executePlan))
+        retryHandler.RetryIterator[ExecutePlanRequest, ExecutePlanResponse](
+          request,
+          r => CloseableIterator(stub.executePlan(r).asScala)))
     }
   }
 
   def executePlanReattachable(
-      request: ExecutePlanRequest): java.util.Iterator[ExecutePlanResponse] = {
+      request: ExecutePlanRequest): CloseableIterator[ExecutePlanResponse] = {
     GrpcExceptionConverter.convert {
       GrpcExceptionConverter.convertIterator[ExecutePlanResponse](
         // Don't use retryHandler - own retry handling is inside.
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 41648c3c100..d412d9b5770 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -50,7 +50,7 @@ class ExecutePlanResponseReattachableIterator(
     request: proto.ExecutePlanRequest,
     channel: ManagedChannel,
     retryPolicy: GrpcRetryHandler.RetryPolicy)
-    extends java.util.Iterator[proto.ExecutePlanResponse]
+    extends CloseableIterator[proto.ExecutePlanResponse]
     with Logging {
 
   val operationId = if (request.hasOperationId) {
@@ -90,8 +90,8 @@ class ExecutePlanResponseReattachableIterator(
 
   // Initial iterator comes from ExecutePlan request.
   // Note: This is not retried, because no error would ever be thrown here, 
and GRPC will only
-  // throw error on first iterator.hasNext() or iterator.next()
-  private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
+  // throw error on first iter.hasNext() or iter.next()
+  private var iter: java.util.Iterator[proto.ExecutePlanResponse] =
     rawBlockingStub.executePlan(initialRequest)
 
   override def next(): proto.ExecutePlanResponse = synchronized {
@@ -105,11 +105,11 @@ class ExecutePlanResponseReattachableIterator(
       var firstTry = true
       val ret = retry {
         if (firstTry) {
-          // on first try, we use the existing iterator.
+          // on first try, we use the existing iter.
           firstTry = false
         } else {
-          // on retry, the iterator is borked, so we need a new one
-          iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+          // on retry, the iter is borked, so we need a new one
+          iter = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
         }
         callIter(_.next())
       }
@@ -138,23 +138,23 @@ class ExecutePlanResponseReattachableIterator(
     try {
       retry {
         if (firstTry) {
-          // on first try, we use the existing iterator.
+          // on first try, we use the existing iter.
           firstTry = false
         } else {
-          // on retry, the iterator is borked, so we need a new one
-          iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+          // on retry, the iter is borked, so we need a new one
+          iter = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
         }
         var hasNext = callIter(_.hasNext())
         // Graceful reattach:
-        // If iterator ended, but there was no ResultComplete, it means that 
there is more,
+        // If iter ended, but there was no ResultComplete, it means that there 
is more,
         // and we need to reattach.
         if (!hasNext && !resultComplete) {
           do {
-            iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+            iter = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
             assert(!resultComplete) // shouldn't change...
             hasNext = callIter(_.hasNext())
-            // It's possible that the new iterator will be empty, so we need 
to loop to get another.
-            // Eventually, there will be a non empty iterator, because there 
is always a
+            // It's possible that the new iter will be empty, so we need to 
loop to get another.
+            // Eventually, there will be a non empty iter, because there is 
always a
             // ResultComplete inserted by the server at the end of the stream.
           } while (!hasNext)
         }
@@ -167,6 +167,10 @@ class ExecutePlanResponseReattachableIterator(
     }
   }
 
+  override def close(): Unit = {
+    releaseAll()
+  }
+
   /**
    * Inform the server to release the buffered execution results until and 
including given result.
    *
@@ -204,7 +208,7 @@ class ExecutePlanResponseReattachableIterator(
    */
   private def callIter[V](iterFun: 
java.util.Iterator[proto.ExecutePlanResponse] => V) = {
     try {
-      iterFun(iterator)
+      iterFun(iter)
     } catch {
       case ex: StatusRuntimeException
           if StatusProto
@@ -217,7 +221,7 @@ class ExecutePlanResponseReattachableIterator(
             ex)
         }
         // Try a new ExecutePlan, and throw upstream for retry.
-        iterator = rawBlockingStub.executePlan(initialRequest)
+        iter = rawBlockingStub.executePlan(initialRequest)
         throw new GrpcRetryHandler.RetryException
     }
   }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 1a42ec821d8..7ff3421a5a0 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -31,8 +31,8 @@ private[client] object GrpcExceptionConverter {
     }
   }
 
-  def convertIterator[T](iter: java.util.Iterator[T]): java.util.Iterator[T] = 
{
-    new java.util.Iterator[T] {
+  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+    new CloseableIterator[T] {
       override def hasNext: Boolean = {
         convert {
           iter.hasNext
@@ -44,6 +44,12 @@ private[client] object GrpcExceptionConverter {
           iter.next()
         }
       }
+
+      override def close(): Unit = {
+        convert {
+          iter.close()
+        }
+      }
     }
   }
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index 47ff975b267..6dad5b4b3a9 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -45,13 +45,13 @@ private[client] class GrpcRetryHandler(private val 
retryPolicy: GrpcRetryHandler
    * @tparam U
    *   The type of the response.
    */
-  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
-      extends java.util.Iterator[U] {
+  class RetryIterator[T, U](request: T, call: T => CloseableIterator[U])
+      extends CloseableIterator[U] {
 
     private var opened = false // we only retry if it fails on first call when 
using the iterator
-    private var iterator = call(request)
+    private var iter = call(request)
 
-    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+    private def retryIter[V](f: Iterator[U] => V) = {
       if (!opened) {
         opened = true
         var firstTry = true
@@ -61,26 +61,30 @@ private[client] class GrpcRetryHandler(private val 
retryPolicy: GrpcRetryHandler
             firstTry = false
           } else {
             // on retry, we need to call the RPC again.
-            iterator = call(request)
+            iter = call(request)
           }
-          f(iterator)
+          f(iter)
         }
       } else {
-        f(iterator)
+        f(iter)
       }
     }
 
     override def next: U = {
-      retryIter(_.next())
+      retryIter(_.next)
     }
 
     override def hasNext: Boolean = {
-      retryIter(_.hasNext())
+      retryIter(_.hasNext)
+    }
+
+    override def close(): Unit = {
+      iter.close()
     }
   }
 
   object RetryIterator {
-    def apply[T, U](request: T, call: T => java.util.Iterator[U]): 
RetryIterator[T, U] =
+    def apply[T, U](request: T, call: T => CloseableIterator[U]): 
RetryIterator[T, U] =
       new RetryIterator(request, call)
   }
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 3d20be88888..a028df536cf 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -75,10 +75,11 @@ private[sql] class SparkConnectClient(
   /**
    * Execute the plan and return response iterator.
    *
-   * It returns an open iterator. The caller needs to ensure that this 
iterator is fully consumed,
-   * otherwise resources held by a re-attachable query may be left dangling 
until server timeout.
+   * It returns CloseableIterator. For resource management it is better to 
close it once you are
+   * done. If you don't close it, it and the underlying data will be cleaned 
up once the iterator
+   * is garbage collected.
    */
-  def execute(plan: proto.Plan): java.util.Iterator[proto.ExecutePlanResponse] 
= {
+  def execute(plan: proto.Plan): CloseableIterator[proto.ExecutePlanResponse] 
= {
     artifactManager.uploadAllClassFileArtifacts()
     val request = proto.ExecutePlanRequest
       .newBuilder()
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
index 93c32aa2954..609e84779fb 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
@@ -27,14 +27,14 @@ import org.apache.arrow.vector.types.pojo
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, 
UnboundRowEncoder}
-import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, 
ArrowDeserializingIterator, CloseableIterator, ConcatenatingArrowStreamReader, 
MessageIterator}
+import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, 
ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator}
 import org.apache.spark.sql.connect.client.util.Cleanable
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.ArrowUtils
 
 private[sql] class SparkResult[T](
-    responses: java.util.Iterator[proto.ExecutePlanResponse],
+    responses: CloseableIterator[proto.ExecutePlanResponse],
     allocator: BufferAllocator,
     encoder: AgnosticEncoder[T],
     timeZoneId: String)
@@ -198,22 +198,22 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable =
+  def iterator: CloseableIterator[T] =
     buildIterator(destructive = false)
 
   /**
    * Returns an destructive iterator over the contents of the result.
    */
-  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+  def destructiveIterator: CloseableIterator[T] =
     buildIterator(destructive = true)
 
-  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with 
AutoCloseable = {
-    new java.util.Iterator[T] with AutoCloseable {
-      private[this] var iterator: CloseableIterator[T] = _
+  private def buildIterator(destructive: Boolean): CloseableIterator[T] = {
+    new CloseableIterator[T] {
+      private[this] var iter: CloseableIterator[T] = _
 
       private def initialize(): Unit = {
-        if (iterator == null) {
-          iterator = new ArrowDeserializingIterator(
+        if (iter == null) {
+          iter = new ArrowDeserializingIterator(
             createEncoder(encoder, schema),
             new ConcatenatingArrowStreamReader(
               allocator,
@@ -225,17 +225,17 @@ private[sql] class SparkResult[T](
 
       override def hasNext: Boolean = {
         initialize()
-        iterator.hasNext
+        iter.hasNext
       }
 
       override def next(): T = {
         initialize()
-        iterator.next()
+        iter.next()
       }
 
       override def close(): Unit = {
-        if (iterator != null) {
-          iterator.close()
+        if (iter != null) {
+          iter.close()
         }
       }
     }
@@ -246,7 +246,7 @@ private[sql] class SparkResult[T](
    */
   override def close(): Unit = cleaner.close()
 
-  override val cleaner: AutoCloseable = new SparkResultCloseable(resultMap)
+  override val cleaner: AutoCloseable = new SparkResultCloseable(resultMap, 
responses)
 
   private class ResultMessageIterator(destructive: Boolean) extends 
AbstractMessageIterator {
     private[this] var totalBytesRead = 0L
@@ -296,7 +296,12 @@ private[sql] class SparkResult[T](
   }
 }
 
-private[client] class SparkResultCloseable(resultMap: mutable.Map[Int, (Long, 
Seq[ArrowMessage])])
+private[client] class SparkResultCloseable(
+    resultMap: mutable.Map[Int, (Long, Seq[ArrowMessage])],
+    responses: CloseableIterator[proto.ExecutePlanResponse])
     extends AutoCloseable {
-  override def close(): Unit = 
resultMap.values.foreach(_._2.foreach(_.close()))
+  override def close(): Unit = {
+    resultMap.values.foreach(_._2.foreach(_.close()))
+    responses.close()
+  }
 }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
index 509ceffc552..55dd640f1b6 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connect.client.CloseableIterator
 import org.apache.spark.sql.errors.{CompilationErrors, ExecutionErrors}
 import org.apache.spark.sql.types.Decimal
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala
index ed273369854..b9badc5c936 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala
@@ -40,8 +40,6 @@ private[arrow] object ArrowEncoderUtils {
   }
 }
 
-trait CloseableIterator[E] extends Iterator[E] with AutoCloseable
-
 private[arrow] object StructVectors {
   def unapply(v: AnyRef): Option[(StructVector, Seq[FieldVector])] = v match {
     case root: VectorSchemaRoot => Option((null, 
root.getFieldVectors.asScala.toSeq))
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala
index c4a2cfa8a85..9e67522711c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala
@@ -38,6 +38,7 @@ import 
org.apache.spark.sql.catalyst.DefinedByConstructorParams
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
 import org.apache.spark.sql.catalyst.util.{SparkDateTimeUtils, 
SparkIntervalUtils}
+import org.apache.spark.sql.connect.client.CloseableIterator
 import org.apache.spark.sql.errors.ExecutionErrors
 import org.apache.spark.sql.types.Decimal
 import org.apache.spark.sql.util.ArrowUtils
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 1403d460b51..98fbff84ba6 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -755,7 +755,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
 
   private def checkSameResult[E](expected: scala.collection.Seq[E], dataset: 
Dataset[E]): Unit = {
     dataset.withResult { result =>
-      assert(expected === result.iterator.asScala.toBuffer)
+      assert(expected === result.iterator.toBuffer)
     }
   }
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
index dd0e9347ac8..7a8e8465a70 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND
 import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
 import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._
 import org.apache.spark.sql.catalyst.util.SparkIntervalUtils._
+import org.apache.spark.sql.connect.client.CloseableIterator
 import org.apache.spark.sql.connect.client.arrow.FooEnum.FooEnum
 import org.apache.spark.sql.connect.client.util.ConnectFunSuite
 import org.apache.spark.sql.types.{ArrayType, DataType, DayTimeIntervalType, 
Decimal, DecimalType, IntegerType, Metadata, SQLUserDefinedType, StructType, 
UserDefinedType, YearMonthIntervalType}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to