[ https://issues.apache.org/jira/browse/SPARK-47146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823832#comment-17823832 ]
Mridul Muralidharan commented on SPARK-47146: --------------------------------------------- Backported to 3.5 and 3.4 in PR: https://github.com/apache/spark/pull/45390 > Possible thread leak when doing sort merge join > ----------------------------------------------- > > Key: SPARK-47146 > URL: https://issues.apache.org/jira/browse/SPARK-47146 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.2.0, 3.3.0, 3.4.0 > Reporter: JacobZheng > Assignee: JacobZheng > Priority: Critical > Labels: pull-request-available > Fix For: 4.0.0, 3.5.2, 3.4.3 > > > I have a long-running spark job. stumbled upon executor taking up a lot of > threads, resulting in no threads available on the server. Querying thread > details via jstack, there are tons of threads named read-ahead. Checking the > code confirms that these threads are created by ReadAheadInputStream. This > class is initialized to create a single-threaded thread pool > {code:java} > private final ExecutorService executorService = > ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); {code} > This thread pool is closed by ReadAheadInputStream#close(). > The call stack for the normal case close() method is > {code:java} > ts=2024-02-21 17:36:18;thread_name=Executor task launch worker for task 60.0 > in stage 71.0 (TID > 258);id=330;is_daemon=true;priority=5;TCCL=org.apache.spark.util.MutableURLClassLoader@17233230 > @org.apache.spark.io.ReadAheadInputStream.close() > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.close(UnsafeSorterSpillReader.java:149) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:121) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:87) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:187) > at > org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:67) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage27.processNext(null:-1) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.smj_findNextJoinRows_0$(null:-1) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_1$(null:-1) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_0$(null:-1) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(null:-1) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) > at org.apache.spark.scheduler.Task.run(Task.scala:139) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.lang.Thread.run(Thread.java:829) {code} > As shown in UnsafeSorterSpillReader#close, the stream is only closed when the > data in the stream is read through. > {code:java} > @Override > public void loadNext() throws IOException { > // Kill the task in case it has been marked as killed. This logic is from > // InterruptibleIterator, but we inline it here instead of wrapping the > iterator in order > // to avoid performance overhead. This check is added here in `loadNext()` > instead of in > // `hasNext()` because it's technically possible for the caller to be > relying on > // `getNumRecords()` instead of `hasNext()` to know when to stop. > if (taskContext != null) { > taskContext.killTaskIfInterrupted(); > } > recordLength = din.readInt(); > keyPrefix = din.readLong(); > if (recordLength > arr.length) { > arr = new byte[recordLength]; > baseObject = arr; > } > ByteStreams.readFully(in, arr, 0, recordLength); > numRecordsRemaining--; > if (numRecordsRemaining == 0) { > close(); > } > } {code} > In sort merge join+inner join, if any StreamSide or BufferSide iterator > touches the end, the unread iterator at the other end will not continue to > read. A similar situation exists for left and right outer joins. > In short, in several specific sort merge join types, a memory leak can occur > when the amount of data is so large that a spill is triggered and there are > iterators that are not read through. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org