This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new d18305d5e93 [SPARK-44656][CONNECT] Make all iterators CloseableIterators d18305d5e93 is described below commit d18305d5e9312a438317b9b2ff800f2c074e3917 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Fri Aug 4 16:06:57 2023 +0200 [SPARK-44656][CONNECT] Make all iterators CloseableIterators ### What changes were proposed in this pull request? This makes sure that all iterators used in Spark Connect scala client are `CloseableIterator`. 1. Makes `CustomSparkConnectBlockingStub.executePlan` return `CloseableIterator` and make all wrappers respect that. 2. Makes `ExecutePlanResponseReattachableIterator` a `CloseableIterator`, with an implementation that will inform the server that query result can be released with ReleaseExecute. 3. Makes `SparkResult.iterator` explicitly a `CloseableIterator`, and also register the `SparkResult.responses` iterator as with the `SparkResultCloseable` cleaner, which will make it close upon GC, if not closed explicitly sooner. 4. Because `Dataset.toLocalIterator` requires a Java iterator, implement a conversion to `java.util.Iterator with AutoCloseable` to be returned there 5. Using `CloseableIterator` consistently everywhere else removes the need to convert between iterator types. ### Why are the changes needed? Properly closeable iterators are needed for resource management, and with reattachable execution to inform server that processing finished. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Exercise current E2E tests. Co-authored-by: Alice Sayutina <alice.sayutinadatabricks.com> Closes #42331 from juliuszsompolski/closeable_iterators. Lead-authored-by: Juliusz Sompolski <ju...@databricks.com> Co-authored-by: Alice Sayutina <alice.sayut...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 84ea6f242e4982187edc0a8f5786e7dc69ec31d7) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 4 +- .../scala/org/apache/spark/sql/SparkSession.scala | 21 ++++------ .../sql/connect/client/CloseableIterator.scala | 46 ++++++++++++++++++++++ .../client/CustomSparkConnectBlockingStub.scala | 10 +++-- .../ExecutePlanResponseReattachableIterator.scala | 34 +++++++++------- .../connect/client/GrpcExceptionConverter.scala | 10 ++++- .../sql/connect/client/GrpcRetryHandler.scala | 24 ++++++----- .../sql/connect/client/SparkConnectClient.scala | 7 ++-- .../spark/sql/connect/client/SparkResult.scala | 37 +++++++++-------- .../connect/client/arrow/ArrowDeserializer.scala | 1 + .../connect/client/arrow/ArrowEncoderUtils.scala | 2 - .../sql/connect/client/arrow/ArrowSerializer.scala | 1 + .../org/apache/spark/sql/ClientE2ETestSuite.scala | 2 +- .../connect/client/arrow/ArrowEncoderSuite.scala | 1 + 14 files changed, 133 insertions(+), 67 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 0f7b376955c..8a7dce3987a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2832,7 +2832,7 @@ class Dataset[T] private[sql] ( /** * Returns an iterator that contains all rows in this Dataset. * - * The returned iterator implements [[AutoCloseable]]. For memory management it is better to + * The returned iterator implements [[AutoCloseable]]. For resource management it is better to * close it once you are done. If you don't close it, it and the underlying data will be cleaned * up once the iterator is garbage collected. * @@ -2840,7 +2840,7 @@ class Dataset[T] private[sql] ( * @since 3.4.0 */ def toLocalIterator(): java.util.Iterator[T] = { - collectResult().destructiveIterator + collectResult().destructiveIterator.asJava } /** diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 59f3f3526ab..355d7edadc7 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -252,10 +252,8 @@ class SparkSession private[sql] ( .setSql(sqlText) .addAllPosArgs(args.map(toLiteralProto).toIterable.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) - val responseSeq = client.execute(plan.build()).asScala.toSeq - - // sequence is a lazy stream, force materialize it to make sure it is consumed. - responseSeq.foreach(_ => ()) + // .toBuffer forces that the iterator is consumed and closed + val responseSeq = client.execute(plan.build()).toBuffer.toSeq val response = responseSeq .find(_.hasSqlCommandResult) @@ -311,10 +309,8 @@ class SparkSession private[sql] ( .setSql(sqlText) .putAllArgs(args.asScala.mapValues(toLiteralProto).toMap.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) - val responseSeq = client.execute(plan.build()).asScala.toSeq - - // sequence is a lazy stream, force materialize it to make sure it is consumed. - responseSeq.foreach(_ => ()) + // .toBuffer forces that the iterator is consumed and closed + val responseSeq = client.execute(plan.build()).toBuffer.toSeq val response = responseSeq .find(_.hasSqlCommandResult) @@ -548,15 +544,14 @@ class SparkSession private[sql] ( f(builder) builder.getCommonBuilder.setPlanId(planIdGenerator.getAndIncrement()) val plan = proto.Plan.newBuilder().setRoot(builder).build() - client.execute(plan).asScala.foreach(_ => ()) + // .toBuffer forces that the iterator is consumed and closed + client.execute(plan).toBuffer } private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = { val plan = proto.Plan.newBuilder().setCommand(command).build() - val seq = client.execute(plan).asScala.toSeq - // sequence is a lazy stream, force materialize it to make sure it is consumed. - seq.foreach(_ => ()) - seq + // .toBuffer forces that the iterator is consumed and closed + client.execute(plan).toBuffer.toSeq } private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction): Unit = { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala new file mode 100644 index 00000000000..891e50ed6e7 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.client + +private[sql] trait CloseableIterator[E] extends Iterator[E] with AutoCloseable { self => + def asJava: java.util.Iterator[E] = new java.util.Iterator[E] with AutoCloseable { + override def next() = self.next() + + override def hasNext() = self.hasNext + + override def close() = self.close() + } +} + +private[sql] object CloseableIterator { + + /** + * Wrap iterator to get CloseeableIterator, if it wasn't closeable already. + */ + def apply[T](iterator: Iterator[T]): CloseableIterator[T] = iterator match { + case closeable: CloseableIterator[T] => closeable + case _ => + new CloseableIterator[T] { + override def next(): T = iterator.next() + + override def hasNext(): Boolean = iterator.hasNext + + override def close() = { /* empty */ } + } + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala index bb20901eade..73ff01e223f 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.connect.client +import scala.collection.JavaConverters._ + import io.grpc.ManagedChannel import org.apache.spark.connect.proto._ @@ -27,15 +29,17 @@ private[client] class CustomSparkConnectBlockingStub( private val stub = SparkConnectServiceGrpc.newBlockingStub(channel) private val retryHandler = new GrpcRetryHandler(retryPolicy) - def executePlan(request: ExecutePlanRequest): java.util.Iterator[ExecutePlanResponse] = { + def executePlan(request: ExecutePlanRequest): CloseableIterator[ExecutePlanResponse] = { GrpcExceptionConverter.convert { GrpcExceptionConverter.convertIterator[ExecutePlanResponse]( - retryHandler.RetryIterator(request, stub.executePlan)) + retryHandler.RetryIterator[ExecutePlanRequest, ExecutePlanResponse]( + request, + r => CloseableIterator(stub.executePlan(r).asScala))) } } def executePlanReattachable( - request: ExecutePlanRequest): java.util.Iterator[ExecutePlanResponse] = { + request: ExecutePlanRequest): CloseableIterator[ExecutePlanResponse] = { GrpcExceptionConverter.convert { GrpcExceptionConverter.convertIterator[ExecutePlanResponse]( // Don't use retryHandler - own retry handling is inside. diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 41648c3c100..d412d9b5770 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -50,7 +50,7 @@ class ExecutePlanResponseReattachableIterator( request: proto.ExecutePlanRequest, channel: ManagedChannel, retryPolicy: GrpcRetryHandler.RetryPolicy) - extends java.util.Iterator[proto.ExecutePlanResponse] + extends CloseableIterator[proto.ExecutePlanResponse] with Logging { val operationId = if (request.hasOperationId) { @@ -90,8 +90,8 @@ class ExecutePlanResponseReattachableIterator( // Initial iterator comes from ExecutePlan request. // Note: This is not retried, because no error would ever be thrown here, and GRPC will only - // throw error on first iterator.hasNext() or iterator.next() - private var iterator: java.util.Iterator[proto.ExecutePlanResponse] = + // throw error on first iter.hasNext() or iter.next() + private var iter: java.util.Iterator[proto.ExecutePlanResponse] = rawBlockingStub.executePlan(initialRequest) override def next(): proto.ExecutePlanResponse = synchronized { @@ -105,11 +105,11 @@ class ExecutePlanResponseReattachableIterator( var firstTry = true val ret = retry { if (firstTry) { - // on first try, we use the existing iterator. + // on first try, we use the existing iter. firstTry = false } else { - // on retry, the iterator is borked, so we need a new one - iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + // on retry, the iter is borked, so we need a new one + iter = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) } callIter(_.next()) } @@ -138,23 +138,23 @@ class ExecutePlanResponseReattachableIterator( try { retry { if (firstTry) { - // on first try, we use the existing iterator. + // on first try, we use the existing iter. firstTry = false } else { - // on retry, the iterator is borked, so we need a new one - iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + // on retry, the iter is borked, so we need a new one + iter = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) } var hasNext = callIter(_.hasNext()) // Graceful reattach: - // If iterator ended, but there was no ResultComplete, it means that there is more, + // If iter ended, but there was no ResultComplete, it means that there is more, // and we need to reattach. if (!hasNext && !resultComplete) { do { - iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + iter = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) assert(!resultComplete) // shouldn't change... hasNext = callIter(_.hasNext()) - // It's possible that the new iterator will be empty, so we need to loop to get another. - // Eventually, there will be a non empty iterator, because there is always a + // It's possible that the new iter will be empty, so we need to loop to get another. + // Eventually, there will be a non empty iter, because there is always a // ResultComplete inserted by the server at the end of the stream. } while (!hasNext) } @@ -167,6 +167,10 @@ class ExecutePlanResponseReattachableIterator( } } + override def close(): Unit = { + releaseAll() + } + /** * Inform the server to release the buffered execution results until and including given result. * @@ -204,7 +208,7 @@ class ExecutePlanResponseReattachableIterator( */ private def callIter[V](iterFun: java.util.Iterator[proto.ExecutePlanResponse] => V) = { try { - iterFun(iterator) + iterFun(iter) } catch { case ex: StatusRuntimeException if StatusProto @@ -217,7 +221,7 @@ class ExecutePlanResponseReattachableIterator( ex) } // Try a new ExecutePlan, and throw upstream for retry. - iterator = rawBlockingStub.executePlan(initialRequest) + iter = rawBlockingStub.executePlan(initialRequest) throw new GrpcRetryHandler.RetryException } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index 1a42ec821d8..7ff3421a5a0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -31,8 +31,8 @@ private[client] object GrpcExceptionConverter { } } - def convertIterator[T](iter: java.util.Iterator[T]): java.util.Iterator[T] = { - new java.util.Iterator[T] { + def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = { + new CloseableIterator[T] { override def hasNext: Boolean = { convert { iter.hasNext @@ -44,6 +44,12 @@ private[client] object GrpcExceptionConverter { iter.next() } } + + override def close(): Unit = { + convert { + iter.close() + } + } } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index 47ff975b267..6dad5b4b3a9 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -45,13 +45,13 @@ private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler * @tparam U * The type of the response. */ - class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U]) - extends java.util.Iterator[U] { + class RetryIterator[T, U](request: T, call: T => CloseableIterator[U]) + extends CloseableIterator[U] { private var opened = false // we only retry if it fails on first call when using the iterator - private var iterator = call(request) + private var iter = call(request) - private def retryIter[V](f: java.util.Iterator[U] => V) = { + private def retryIter[V](f: Iterator[U] => V) = { if (!opened) { opened = true var firstTry = true @@ -61,26 +61,30 @@ private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler firstTry = false } else { // on retry, we need to call the RPC again. - iterator = call(request) + iter = call(request) } - f(iterator) + f(iter) } } else { - f(iterator) + f(iter) } } override def next: U = { - retryIter(_.next()) + retryIter(_.next) } override def hasNext: Boolean = { - retryIter(_.hasNext()) + retryIter(_.hasNext) + } + + override def close(): Unit = { + iter.close() } } object RetryIterator { - def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] = + def apply[T, U](request: T, call: T => CloseableIterator[U]): RetryIterator[T, U] = new RetryIterator(request, call) } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 3d20be88888..a028df536cf 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -75,10 +75,11 @@ private[sql] class SparkConnectClient( /** * Execute the plan and return response iterator. * - * It returns an open iterator. The caller needs to ensure that this iterator is fully consumed, - * otherwise resources held by a re-attachable query may be left dangling until server timeout. + * It returns CloseableIterator. For resource management it is better to close it once you are + * done. If you don't close it, it and the underlying data will be cleaned up once the iterator + * is garbage collected. */ - def execute(plan: proto.Plan): java.util.Iterator[proto.ExecutePlanResponse] = { + def execute(plan: proto.Plan): CloseableIterator[proto.ExecutePlanResponse] = { artifactManager.uploadAllClassFileArtifacts() val request = proto.ExecutePlanRequest .newBuilder() diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala index 93c32aa2954..609e84779fb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala @@ -27,14 +27,14 @@ import org.apache.arrow.vector.types.pojo import org.apache.spark.connect.proto import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, UnboundRowEncoder} -import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, ArrowDeserializingIterator, CloseableIterator, ConcatenatingArrowStreamReader, MessageIterator} +import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator} import org.apache.spark.sql.connect.client.util.Cleanable import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ArrowUtils private[sql] class SparkResult[T]( - responses: java.util.Iterator[proto.ExecutePlanResponse], + responses: CloseableIterator[proto.ExecutePlanResponse], allocator: BufferAllocator, encoder: AgnosticEncoder[T], timeZoneId: String) @@ -198,22 +198,22 @@ private[sql] class SparkResult[T]( /** * Returns an iterator over the contents of the result. */ - def iterator: java.util.Iterator[T] with AutoCloseable = + def iterator: CloseableIterator[T] = buildIterator(destructive = false) /** * Returns an destructive iterator over the contents of the result. */ - def destructiveIterator: java.util.Iterator[T] with AutoCloseable = + def destructiveIterator: CloseableIterator[T] = buildIterator(destructive = true) - private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = { - new java.util.Iterator[T] with AutoCloseable { - private[this] var iterator: CloseableIterator[T] = _ + private def buildIterator(destructive: Boolean): CloseableIterator[T] = { + new CloseableIterator[T] { + private[this] var iter: CloseableIterator[T] = _ private def initialize(): Unit = { - if (iterator == null) { - iterator = new ArrowDeserializingIterator( + if (iter == null) { + iter = new ArrowDeserializingIterator( createEncoder(encoder, schema), new ConcatenatingArrowStreamReader( allocator, @@ -225,17 +225,17 @@ private[sql] class SparkResult[T]( override def hasNext: Boolean = { initialize() - iterator.hasNext + iter.hasNext } override def next(): T = { initialize() - iterator.next() + iter.next() } override def close(): Unit = { - if (iterator != null) { - iterator.close() + if (iter != null) { + iter.close() } } } @@ -246,7 +246,7 @@ private[sql] class SparkResult[T]( */ override def close(): Unit = cleaner.close() - override val cleaner: AutoCloseable = new SparkResultCloseable(resultMap) + override val cleaner: AutoCloseable = new SparkResultCloseable(resultMap, responses) private class ResultMessageIterator(destructive: Boolean) extends AbstractMessageIterator { private[this] var totalBytesRead = 0L @@ -296,7 +296,12 @@ private[sql] class SparkResult[T]( } } -private[client] class SparkResultCloseable(resultMap: mutable.Map[Int, (Long, Seq[ArrowMessage])]) +private[client] class SparkResultCloseable( + resultMap: mutable.Map[Int, (Long, Seq[ArrowMessage])], + responses: CloseableIterator[proto.ExecutePlanResponse]) extends AutoCloseable { - override def close(): Unit = resultMap.values.foreach(_._2.foreach(_.close())) + override def close(): Unit = { + resultMap.values.foreach(_._2.foreach(_.close())) + responses.close() + } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala index 509ceffc552..55dd640f1b6 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.connect.client.CloseableIterator import org.apache.spark.sql.errors.{CompilationErrors, ExecutionErrors} import org.apache.spark.sql.types.Decimal diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala index ed273369854..b9badc5c936 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala @@ -40,8 +40,6 @@ private[arrow] object ArrowEncoderUtils { } } -trait CloseableIterator[E] extends Iterator[E] with AutoCloseable - private[arrow] object StructVectors { def unapply(v: AnyRef): Option[(StructVector, Seq[FieldVector])] = v match { case root: VectorSchemaRoot => Option((null, root.getFieldVectors.asScala.toSeq)) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala index c4a2cfa8a85..9e67522711c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.DefinedByConstructorParams import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ import org.apache.spark.sql.catalyst.util.{SparkDateTimeUtils, SparkIntervalUtils} +import org.apache.spark.sql.connect.client.CloseableIterator import org.apache.spark.sql.errors.ExecutionErrors import org.apache.spark.sql.types.Decimal import org.apache.spark.sql.util.ArrowUtils diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 1403d460b51..98fbff84ba6 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -755,7 +755,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM private def checkSameResult[E](expected: scala.collection.Seq[E], dataset: Dataset[E]): Unit = { dataset.withResult { result => - assert(expected === result.iterator.asScala.toBuffer) + assert(expected === result.iterator.toBuffer) } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala index dd0e9347ac8..7a8e8465a70 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._ import org.apache.spark.sql.catalyst.util.SparkIntervalUtils._ +import org.apache.spark.sql.connect.client.CloseableIterator import org.apache.spark.sql.connect.client.arrow.FooEnum.FooEnum import org.apache.spark.sql.connect.client.util.ConnectFunSuite import org.apache.spark.sql.types.{ArrayType, DataType, DayTimeIntervalType, Decimal, DecimalType, IntegerType, Metadata, SQLUserDefinedType, StructType, UserDefinedType, YearMonthIntervalType} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org