This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 259ac250017 [SPARK-45814][CONNECT][SQL][3.4] Make 
ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak
259ac250017 is described below

commit 259ac250017bcc1805f6cb44a5e7eedf9e552a98
Author: xieshuaihu <xieshua...@agora.io>
AuthorDate: Fri Nov 10 12:33:24 2023 +0800

    [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch 
call close() to avoid memory leak
    
    ### What changes were proposed in this pull request?
    
    Make `ArrowBatchIterator` implement `AutoCloseable` and 
`ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak.
    
    ### Why are the changes needed?
    
    `ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if 
`TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` 
is leaked.
    
    In spark connect, `createEmptyArrowBatch` is called in 
[SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558)
 and 
[SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224),
 which cause a long running driver consume all off-heap memory sp [...]
    
    This is the exception stack:
    ```
    org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
            at 
io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
            at 
org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
            at 
org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
            at 
org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
            at 
org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
            at 
org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
            at 
org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
            at 
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
            at 
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
            at 
org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
            at 
org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
            at 
org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
            at 
org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
            at 
org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
            at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
            at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
            at 
scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
            at 
org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
            at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
            at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
            at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
            at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176)
            at 
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178)
            at 
org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175)
            at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
            at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228)
    Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to 
allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824)
            at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
            at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
            at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
            at 
io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
            at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
            at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
            at 
io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
            at 
io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
            at 
io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
            ... 37 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manually test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43728 from xieshuaihu/3_4_SPARK-45814.
    
    Authored-by: xieshuaihu <xieshua...@agora.io>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../sql/execution/arrow/ArrowConverters.scala      | 25 ++++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index df26a06c86d..aa018efc1f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -77,7 +77,7 @@ private[sql] object ArrowConverters extends Logging {
       schema: StructType,
       maxRecordsPerBatch: Long,
       timeZoneId: String,
-      context: TaskContext) extends Iterator[Array[Byte]] {
+      context: TaskContext) extends Iterator[Array[Byte]] with AutoCloseable {
 
     protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
     private val allocator =
@@ -89,13 +89,11 @@ private[sql] object ArrowConverters extends Logging {
     protected val arrowWriter = ArrowWriter.create(root)
 
     Option(context).foreach {_.addTaskCompletionListener[Unit] { _ =>
-      root.close()
-      allocator.close()
+      close()
     }}
 
     override def hasNext: Boolean = rowIter.hasNext || {
-      root.close()
-      allocator.close()
+      close()
       false
     }
 
@@ -120,6 +118,11 @@ private[sql] object ArrowConverters extends Logging {
 
       out.toByteArray
     }
+
+    override def close(): Unit = {
+      root.close()
+      allocator.close()
+    }
   }
 
   private[sql] class ArrowBatchWithSchemaIterator(
@@ -217,10 +220,18 @@ private[sql] object ArrowConverters extends Logging {
   private[sql] def createEmptyArrowBatch(
       schema: StructType,
       timeZoneId: String): Array[Byte] = {
-    new ArrowBatchWithSchemaIterator(
+    val batches = new ArrowBatchWithSchemaIterator(
         Iterator.empty, schema, 0L, 0L, timeZoneId, TaskContext.get) {
       override def hasNext: Boolean = true
-    }.next()
+    }
+    Utils.tryWithSafeFinally {
+      batches.next()
+    } {
+      // If taskContext is null, `batches.close()` should be called to avoid 
memory leak.
+      if (TaskContext.get() == null) {
+        batches.close()
+      }
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to