This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dfb35bed522c [SPARK-47146][CORE] Possible thread leak when doing sort merge join dfb35bed522c is described below commit dfb35bed522ca706f8fc18e37c05c1766c8d8a18 Author: JacobZheng0927 <zsh517559...@163.com> AuthorDate: Mon Mar 4 23:17:32 2024 -0600 [SPARK-47146][CORE] Possible thread leak when doing sort merge join ### What changes were proposed in this pull request? Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream. ### Why are the changes needed? SPARK-40849 modified the implementation of `newDaemonSingleThreadExecutor` to use `newFixedThreadPool` instead of `newSingleThreadExecutor` .The difference is that `newSingleThreadExecutor` uses the `FinalizableDelegatedExecutorService`, which provides a `finalize` method that automatically closes the thread pool. In some cases, sort merge join execution uses `ReadAheadSteam` and does not close it, so this change caused a thread leak. Since Finalization is deprecated and subject to re [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #45327 from JacobZheng0927/SPARK-47146. Authored-by: JacobZheng0927 <zsh517559...@163.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../unsafe/sort/UnsafeSorterSpillReader.java | 12 ++++++++ .../scala/org/apache/spark/sql/JoinSuite.scala | 33 +++++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index db79efd00853..8bd44c8c52c1 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -28,6 +28,8 @@ import org.apache.spark.io.ReadAheadInputStream; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; import org.apache.spark.unsafe.Platform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.*; @@ -36,6 +38,7 @@ import java.io.*; * of the file format). */ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb private InputStream in; @@ -82,6 +85,15 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen Closeables.close(bs, /* swallowIOException = */ true); throw e; } + if (taskContext != null) { + taskContext.addTaskCompletionListener(context -> { + try { + close(); + } catch (IOException e) { + logger.info("error while closing UnsafeSorterSpillReader", e); + } + }); + } } @Override diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index f31f60e8df56..be6862f5b96b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} +import org.apache.spark.internal.config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder} @@ -34,7 +35,7 @@ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExch import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.BatchEvalPythonExec import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} import org.apache.spark.sql.types.StructType import org.apache.spark.tags.SlowSQLTest @@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } } + +class ThreadLeakInSortMergeJoinSuite + extends QueryTest + with SharedSparkSession + with AdaptiveSparkPlanHelper { + + setupTestData() + override protected def createSparkSession: TestSparkSession = { + SparkSession.cleanupAnyExistingSession() + new TestSparkSession( + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + + test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { + + assertSpilled(sparkContext, "inner join") { + sql("SELECT * FROM testData JOIN testData2 ON key = a").collect() + } + + val readAheadThread = Thread.getAllStackTraces.keySet().asScala + .find { + _.getName.startsWith("read-ahead") + } + assert(readAheadThread.isEmpty) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org