Zhen Wang created SPARK-54934:
---------------------------------

             Summary: Memory Leak Caused by thread-unsafe 
UnsafeInMemorySorter.freeMemory 
                 Key: SPARK-54934
                 URL: https://issues.apache.org/jira/browse/SPARK-54934
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 3.5.1, 4.2.0
            Reporter: Zhen Wang


I encountered a {*}SparkOutOfMemoryError{*}. Logs indicate that memory is held 
by {*}UnsafeExternalSorter{*}, but UnsafeExternalSorter is supposed to trigger 
spilling and release memory. *UnsafeInMemorySorter.freeMemory* is not 
thread-safe, while *UnsafeExternalSorter.spill* may be called concurrently by 
multiple threads, which could lead to a memory leak.. Related logs:
{code:java}
26/01/05 23:04:53 INFO UnsafeExternalSorter: Thread 152 spilling sort data of 
2.1 GiB to disk (0  time so far)...26/01/05 23:05:10 INFO UnsafeExternalSorter: 
Thread 152 spilling sort data of 303.0 KiB to disk (0  time so far)26/01/05 
23:05:10 INFO TaskMemoryManager: Memory used in task 17203026/01/05 23:05:10 
INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@72075707: 
303.0 KiB26/01/05 23:05:10 INFO TaskMemoryManager: Acquired by 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@3e02e759: 2.1 
GiB26/01/05 23:05:10 INFO TaskMemoryManager: 0 bytes of memory were used by 
task 172030 but are not associated with specific consumers26/01/05 23:05:10 
INFO TaskMemoryManager: 2214902769 bytes of memory are used for execution and 
87455554 bytes of memory are used for storage26/01/05 23:05:10 ERROR Executor: 
Exception in task 5038.0 in stage 185.0 (TID 
172030)org.apache.spark.memory.SparkOutOfMemoryError: 
[UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 16384 bytes of memory, got 0.      
  at 
org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:467)
 at 
org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala) 
     at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)     at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98) at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:384)
 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:467)
  at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:487)
        at 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.$anonfun$add$2(ExternalAppendOnlyUnsafeRowArray.scala:149)
   at 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.$anonfun$add$2$adapted(ExternalAppendOnlyUnsafeRowArray.scala:143)
   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)  
   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)   at 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:143)
      at 
org.apache.spark.sql.execution.window.WindowExec$$anon$1.fetchNextPartition(WindowExec.scala:147)
    at 
org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:180)
  at 
org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:107)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)       at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)       at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:199)
        at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:84)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
     at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
      at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:328)     at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)     at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:328)     at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)     at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:328)     at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)     at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:328)     at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) 
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)  
 at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)    
 at org.apache.spark.scheduler.Task.run(Task.scala:141)  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
     at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
   at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)      at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to