[ 
https://issues.apache.org/jira/browse/SPARK-47146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-47146.
-----------------------------------------
    Fix Version/s: 4.0.0
       Resolution: Fixed

Issue resolved by pull request 45327
[https://github.com/apache/spark/pull/45327]

> Possible thread leak when doing sort merge join
> -----------------------------------------------
>
>                 Key: SPARK-47146
>                 URL: https://issues.apache.org/jira/browse/SPARK-47146
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.0, 3.3.0, 3.4.0
>            Reporter: JacobZheng
>            Assignee: JacobZheng
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>
> I have a long-running spark job. stumbled upon executor taking up a lot of 
> threads, resulting in no threads available on the server. Querying thread 
> details via jstack, there are tons of threads named read-ahead. Checking the 
> code confirms that these threads are created by ReadAheadInputStream. This 
> class is initialized to create a single-threaded thread pool
> {code:java}
> private final ExecutorService executorService =
>     ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); {code}
> This thread pool is closed by ReadAheadInputStream#close(). 
> The call stack for the normal case close() method is
> {code:java}
> ts=2024-02-21 17:36:18;thread_name=Executor task launch worker for task 60.0 
> in stage 71.0 (TID 
> 258);id=330;is_daemon=true;priority=5;TCCL=org.apache.spark.util.MutableURLClassLoader@17233230
>     @org.apache.spark.io.ReadAheadInputStream.close()
>         at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.close(UnsafeSorterSpillReader.java:149)
>         at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:121)
>         at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:87)
>         at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:187)
>         at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:67)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage27.processNext(null:-1)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.smj_findNextJoinRows_0$(null:-1)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_1$(null:-1)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_0$(null:-1)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(null:-1)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>         at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>         at org.apache.spark.scheduler.Task.run(Task.scala:139)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.lang.Thread.run(Thread.java:829) {code}
> As shown in UnsafeSorterSpillReader#close, the stream is only closed when the 
> data in the stream is read through.
> {code:java}
> @Override
> public void loadNext() throws IOException {
>   // Kill the task in case it has been marked as killed. This logic is from
>   // InterruptibleIterator, but we inline it here instead of wrapping the 
> iterator in order
>   // to avoid performance overhead. This check is added here in `loadNext()` 
> instead of in
>   // `hasNext()` because it's technically possible for the caller to be 
> relying on
>   // `getNumRecords()` instead of `hasNext()` to know when to stop.
>   if (taskContext != null) {
>     taskContext.killTaskIfInterrupted();
>   }
>   recordLength = din.readInt();
>   keyPrefix = din.readLong();
>   if (recordLength > arr.length) {
>     arr = new byte[recordLength];
>     baseObject = arr;
>   }
>   ByteStreams.readFully(in, arr, 0, recordLength);
>   numRecordsRemaining--;
>   if (numRecordsRemaining == 0) {
>     close();
>   }
> } {code}
> In sort merge join+inner join, if any StreamSide or BufferSide iterator 
> touches the end, the unread iterator at the other end will not continue to 
> read. A similar situation exists for left and right outer joins.
> In short, in several specific sort merge join types, a memory leak can occur 
> when the amount of data is so large that a spill is triggered and there are 
> iterators that are not read through.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to