[
https://issues.apache.org/jira/browse/SPARK-47146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-47146.
---------------------------------
> Possible thread leak when doing sort merge join
> -----------------------------------------------
>
> Key: SPARK-47146
> URL: https://issues.apache.org/jira/browse/SPARK-47146
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.2.0, 3.3.0, 3.4.0
> Reporter: JacobZheng
> Assignee: JacobZheng
> Priority: Critical
> Labels: pull-request-available
> Fix For: 3.5.2, 3.4.3, 4.0.0
>
>
> I have a long-running spark job. stumbled upon executor taking up a lot of
> threads, resulting in no threads available on the server. Querying thread
> details via jstack, there are tons of threads named read-ahead. Checking the
> code confirms that these threads are created by ReadAheadInputStream. This
> class is initialized to create a single-threaded thread pool
> {code:java}
> private final ExecutorService executorService =
> ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); {code}
> This thread pool is closed by ReadAheadInputStream#close().
> The call stack for the normal case close() method is
> {code:java}
> ts=2024-02-21 17:36:18;thread_name=Executor task launch worker for task 60.0
> in stage 71.0 (TID
> 258);id=330;is_daemon=true;priority=5;TCCL=org.apache.spark.util.MutableURLClassLoader@17233230
> @org.apache.spark.io.ReadAheadInputStream.close()
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.close(UnsafeSorterSpillReader.java:149)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:121)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:87)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:187)
> at
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:67)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage27.processNext(null:-1)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.smj_findNextJoinRows_0$(null:-1)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_1$(null:-1)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_0$(null:-1)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(null:-1)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.lang.Thread.run(Thread.java:829) {code}
> As shown in UnsafeSorterSpillReader#close, the stream is only closed when the
> data in the stream is read through.
> {code:java}
> @Override
> public void loadNext() throws IOException {
> // Kill the task in case it has been marked as killed. This logic is from
> // InterruptibleIterator, but we inline it here instead of wrapping the
> iterator in order
> // to avoid performance overhead. This check is added here in `loadNext()`
> instead of in
> // `hasNext()` because it's technically possible for the caller to be
> relying on
> // `getNumRecords()` instead of `hasNext()` to know when to stop.
> if (taskContext != null) {
> taskContext.killTaskIfInterrupted();
> }
> recordLength = din.readInt();
> keyPrefix = din.readLong();
> if (recordLength > arr.length) {
> arr = new byte[recordLength];
> baseObject = arr;
> }
> ByteStreams.readFully(in, arr, 0, recordLength);
> numRecordsRemaining--;
> if (numRecordsRemaining == 0) {
> close();
> }
> } {code}
> In sort merge join+inner join, if any StreamSide or BufferSide iterator
> touches the end, the unread iterator at the other end will not continue to
> read. A similar situation exists for left and right outer joins.
> In short, in several specific sort merge join types, a memory leak can occur
> when the amount of data is so large that a spill is triggered and there are
> iterators that are not read through.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]