This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 10b83deaf94d [SPARK-51866][CONNECT][TESTS] Ensure
`serializerAllocator/deserializerAllocator` are closed if
`ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails to create
`CloseableIterator`
10b83deaf94d is described below
commit 10b83deaf94d2d63f8d64215c2533e8ab870f875
Author: yangjie01 <[email protected]>
AuthorDate: Wed Apr 23 10:48:20 2025 +0800
[SPARK-51866][CONNECT][TESTS] Ensure
`serializerAllocator/deserializerAllocator` are closed if
`ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails to create
`CloseableIterator`
### What changes were proposed in this pull request?
This pull request ensures that `serializerAllocator` and
`deserializerAllocator` are closed when the creation of `CloseableIterator` by
`ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails.
### Why are the changes needed?
When adding the test options `(Test / javaOptions) +=
"-Darrow.memory.debug.allocator=true",` for the `connect-client-jvm` module,
`ArrowEncoderSuite` will throw the following error:
```
[info] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite ***
ABORTED *** (3 seconds, 446 milliseconds)
[info] java.lang.IllegalStateException: Allocator[ROOT] closed with
outstanding child allocators.
[info] Allocator(ROOT) 0/0/574720/9223372036854775807
(res/actual/peak/limit)
[info] child allocators: 2
[info] Allocator(serialization) 0/0/0/9223372036854775807
(res/actual/peak/limit)
[info] child allocators: 0
[info] ledgers: 0
[info] reservations: 0
[info] Allocator(deserialization) 0/0/0/9223372036854775807
(res/actual/peak/limit)
[info] child allocators: 0
[info] ledgers: 0
[info] reservations: 0
[info] ledgers: 0
[info] reservations: 0
[info] at
org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:462)
[info] at
org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:27)
[info] at
org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.afterAll(ArrowEncoderSuite.scala:62)
[info] at
org.scalatest.BeforeAndAfterAll.$anonfun$run$1(BeforeAndAfterAll.scala:225)
[info] at
org.scalatest.Status.$anonfun$withAfterEffect$1(Status.scala:377)
[info] at
org.scalatest.Status.$anonfun$withAfterEffect$1$adapted(Status.scala:373)
[info] at org.scalatest.CompositeStatus.whenCompleted(Status.scala:962)
[info] at org.scalatest.Status.withAfterEffect(Status.scala:373)
[info] at org.scalatest.Status.withAfterEffect$(Status.scala:371)
[info] at org.scalatest.CompositeStatus.withAfterEffect(Status.scala:863)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:224)
[info] at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at
org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.run(ArrowEncoderSuite.scala:53)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 5 seconds, 568 milliseconds.
[info] Total number of tests run: 108
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 108, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error during tests:
[error] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Actions
- locally confirmed that when adding the test parameter `(Test /
javaOptions) += "-Darrow.memory.debug.allocator=true",` for the
`connect-client-jvm` module, the aforementioned error message is no longer
thrown.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50664 from LuciferYang/Fix-ArrowEncoderSuite.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit d7ce6ef55e5fc42a1a852710bff59a842bb767f6)
Signed-off-by: yangjie01 <[email protected]>
---
.../connect/client/arrow/ArrowEncoderSuite.scala | 67 ++++++++++++----------
1 file changed, 38 insertions(+), 29 deletions(-)
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
index 58e19389cae2..75816a835aaa 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
@@ -99,40 +99,49 @@ class ArrowEncoderSuite extends ConnectFunSuite with
BeforeAndAfterAll {
val serializerAllocator = newAllocator("serialization")
val deserializerAllocator = newAllocator("deserialization")
- val arrowIterator = ArrowSerializer.serialize(
- input = iterator,
- enc = inputEncoder,
- allocator = serializerAllocator,
- maxRecordsPerBatch = maxRecordsPerBatch,
- maxBatchSize = maxBatchSize,
- batchSizeCheckInterval = batchSizeCheckInterval,
- timeZoneId = "UTC",
- largeVarTypes = false)
+ try {
+ val arrowIterator = ArrowSerializer.serialize(
+ input = iterator,
+ enc = inputEncoder,
+ allocator = serializerAllocator,
+ maxRecordsPerBatch = maxRecordsPerBatch,
+ maxBatchSize = maxBatchSize,
+ batchSizeCheckInterval = batchSizeCheckInterval,
+ timeZoneId = "UTC",
+ largeVarTypes = false)
- val inspectedIterator = if (inspectBatch != null) {
- arrowIterator.map { batch =>
- inspectBatch(batch)
- batch
+ val inspectedIterator = if (inspectBatch != null) {
+ arrowIterator.map { batch =>
+ inspectBatch(batch)
+ batch
+ }
+ } else {
+ arrowIterator
}
- } else {
- arrowIterator
- }
- val resultIterator =
- ArrowDeserializers.deserializeFromArrow(
- inspectedIterator,
- outputEncoder,
- deserializerAllocator,
- timeZoneId = "UTC")
- new CloseableIterator[O] {
- override def close(): Unit = {
- arrowIterator.close()
- resultIterator.close()
+ val resultIterator =
+ ArrowDeserializers.deserializeFromArrow(
+ inspectedIterator,
+ outputEncoder,
+ deserializerAllocator,
+ timeZoneId = "UTC")
+ new CloseableIterator[O] {
+ override def close(): Unit = {
+ arrowIterator.close()
+ resultIterator.close()
+ serializerAllocator.close()
+ deserializerAllocator.close()
+ }
+
+ override def hasNext: Boolean = resultIterator.hasNext
+
+ override def next(): O = resultIterator.next()
+ }
+ } catch {
+ case e: Throwable =>
serializerAllocator.close()
deserializerAllocator.close()
- }
- override def hasNext: Boolean = resultIterator.hasNext
- override def next(): O = resultIterator.next()
+ throw e
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]