JacobZheng created SPARK-47146:
----------------------------------

             Summary: Possible thread leak when doing sort merge join
                 Key: SPARK-47146
                 URL: https://issues.apache.org/jira/browse/SPARK-47146
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.4.0, 3.3.0, 3.2.0
            Reporter: JacobZheng


I have a long-running spark job. stumbled upon executor taking up a lot of 
threads, resulting in no threads available on the server. Querying thread 
details via jstack, there are tons of threads named read-ahead. Checking the 
code confirms that these threads are created by ReadAheadInputStream. This 
class is initialized to create a single-threaded thread pool
{code:java}
private final ExecutorService executorService =
    ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); {code}
This thread pool is closed by ReadAheadInputStream#close(). 

The call stack for the normal case close() method is
{code:java}
ts=2024-02-21 17:36:18;thread_name=Executor task launch worker for task 60.0 in 
stage 71.0 (TID 
258);id=330;is_daemon=true;priority=5;TCCL=org.apache.spark.util.MutableURLClassLoader@17233230
    @org.apache.spark.io.ReadAheadInputStream.close()
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.close(UnsafeSorterSpillReader.java:149)
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:121)
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:87)
        at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:187)
        at 
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:67)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage27.processNext(null:-1)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.smj_findNextJoinRows_0$(null:-1)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_1$(null:-1)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_0$(null:-1)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(null:-1)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(Thread.java:829) {code}
As shown in UnsafeSorterSpillReader#close, the stream is only closed when the 
data in the stream is read through.
{code:java}
@Override
public void loadNext() throws IOException {
  // Kill the task in case it has been marked as killed. This logic is from
  // InterruptibleIterator, but we inline it here instead of wrapping the 
iterator in order
  // to avoid performance overhead. This check is added here in `loadNext()` 
instead of in
  // `hasNext()` because it's technically possible for the caller to be relying 
on
  // `getNumRecords()` instead of `hasNext()` to know when to stop.
  if (taskContext != null) {
    taskContext.killTaskIfInterrupted();
  }
  recordLength = din.readInt();
  keyPrefix = din.readLong();
  if (recordLength > arr.length) {
    arr = new byte[recordLength];
    baseObject = arr;
  }
  ByteStreams.readFully(in, arr, 0, recordLength);
  numRecordsRemaining--;
  if (numRecordsRemaining == 0) {
    close();
  }
} {code}
In sort merge join+inner join, if any StreamSide or BufferSide iterator touches 
the end, the unread iterator at the other end will not continue to read. A 
similar situation exists for left and right outer joins.

In short, in several specific sort merge join types, a memory leak can occur 
when the amount of data is so large that a spill is triggered and there are 
iterators that are not read through.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to