This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f1286f324c0 [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException` f1286f324c0 is described below commit f1286f324c032f9a875167fdbb265b4d495752c9 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Apr 26 08:47:54 2022 -0500 [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException` ### What changes were proposed in this pull request? There will be `UnsafeSorterSpillReader` resource leak(`InputStream` hold by `UnsafeSorterSpillReader` ) when `SpillableArrayIterator` throw `ConcurrentModificationException`, so this pr add resource cleanup process before `UnsafeSorterSpillReader` throws `ConcurrentModificationException`. ### Why are the changes needed? Fix file resource leak. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA - Add new check in `ExternalAppendOnlyUnsafeRowArraySuite` run command: ``` mvn clean install -pl sql/core -am -DskipTests mvn clean test -pl sql/core -Dtest=none -DwildcardSuites=org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArraySuite ``` **Before** ``` - test iterator invalidation (with spill) *** FAILED *** org.apache.spark.io.ReadAheadInputStream478b0739 did not equal null (ExternalAppendOnlyUnsafeRowArraySuite.scala:397) Run completed in 9 seconds, 652 milliseconds. Total number of tests run: 14 Suites: completed 2, aborted 0 Tests: succeeded 13, failed 1, canceled 0, ignored 0, pending 0 *** 1 TEST FAILED *** ``` **After** ``` Run completed in 8 seconds, 535 milliseconds. Total number of tests run: 14 Suites: completed 2, aborted 0 Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes #36262 from LuciferYang/SPARK-38944. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../unsafe/sort/UnsafeExternalSorter.java | 24 ++++++++++++-- .../ExternalAppendOnlyUnsafeRowArray.scala | 11 +++++++ .../ExternalAppendOnlyUnsafeRowArraySuite.scala | 37 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index c38327cae8c..d836cf3f0e3 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -18,6 +18,7 @@ package org.apache.spark.util.collection.unsafe.sort; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.LinkedList; @@ -25,13 +26,14 @@ import java.util.Queue; import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; -import org.apache.spark.memory.SparkOutOfMemoryError; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TooLargePageException; import org.apache.spark.serializer.SerializerManager; @@ -745,7 +747,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { /** * Chain multiple UnsafeSorterIterator together as single one. */ - static class ChainedIterator extends UnsafeSorterIterator { + static class ChainedIterator extends UnsafeSorterIterator implements Closeable { private final Queue<UnsafeSorterIterator> iterators; private UnsafeSorterIterator current; @@ -798,5 +800,23 @@ public final class UnsafeExternalSorter extends MemoryConsumer { @Override public long getKeyPrefix() { return current.getKeyPrefix(); } + + @Override + public void close() throws IOException { + if (iterators != null && !iterators.isEmpty()) { + for (UnsafeSorterIterator iterator : iterators) { + closeIfPossible(iterator); + } + } + if (current != null) { + closeIfPossible(current); + } + } + + private void closeIfPossible(UnsafeSorterIterator iterator) { + if (iterator instanceof Closeable) { + IOUtils.closeQuietly(((Closeable) iterator)); + } + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 2c9c91ec40b..4147d75186d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.io.Closeable + import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} @@ -192,10 +194,14 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( protected def throwExceptionIfModified(): Unit = { if (expectedModificationsCount != modificationsCount) { + closeIfNeeded() throw QueryExecutionErrors.concurrentModificationOnExternalAppendOnlyUnsafeRowArrayError( classOf[ExternalAppendOnlyUnsafeRowArray].getName) } } + + protected def closeIfNeeded(): Unit = {} + } private[this] class InMemoryBufferIterator(startIndex: Int) @@ -228,6 +234,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength) currentRow } + + override protected def closeIfNeeded(): Unit = iterator match { + case c: Closeable => c.close() + case _ => // do nothing + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 98aba3ba25f..f140d867481 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -17,13 +17,16 @@ package org.apache.spark.sql.execution +import java.util import java.util.ConcurrentModificationException +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.collection.unsafe.sort.{UnsafeSorterIterator, UnsafeSorterSpillReader} class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext { private val random = new java.util.Random() @@ -155,6 +158,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar assert(!iterator1.hasNext) intercept[ConcurrentModificationException](iterator1.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator1) } } @@ -178,6 +182,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar assert(!iterator1.hasNext) intercept[ConcurrentModificationException](iterator1.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator1) } } @@ -265,6 +270,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar populateRows(array, 1) assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator) // Clearing the array should also invalidate any old iterators iterator = array.generateIterator() @@ -274,6 +280,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar array.clear() assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator) } } @@ -292,6 +299,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar populateRows(array, 1) assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator) // Clearing the array should also invalidate any old iterators iterator = array.generateIterator() @@ -301,6 +309,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar array.clear() assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator) } } @@ -319,6 +328,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Clearing an empty array should also invalidate any old iterators assert(!iterator.hasNext) intercept[ConcurrentModificationException](iterator.next()) + checkIteratorClosedWhenThrowConcurrentModificationException(iterator) } } @@ -372,4 +382,31 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar assert(getNumBytesSpilled > bytesSpilled) } } + + private def checkIteratorClosedWhenThrowConcurrentModificationException( + iterator: Iterator[UnsafeRow]): Unit = { + def getFieldValue(obj: Any, fieldName: String): Any = { + val field = obj.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(obj) + } + def checkUnsafeSorterSpillReaderClosed( + unsafeSorterIterator: UnsafeSorterIterator): Unit = unsafeSorterIterator match { + case reader: UnsafeSorterSpillReader => + // If UnsafeSorterSpillReader is not closed, `in` and `din` are not null + assert(getFieldValue(reader, "in") == null) + assert(getFieldValue(reader, "din") == null) + case _ => // do noting + } + // Only check `SpillableArrayIterator` because `InMemoryBufferIterator` not open the file handle + if (iterator.getClass.getSimpleName.equals("SpillableArrayIterator")) { + val chainedIterator = getFieldValue(iterator, "iterator") + val current = getFieldValue(chainedIterator, "current") + assert(current.isInstanceOf[UnsafeSorterIterator]) + checkUnsafeSorterSpillReaderClosed(current.asInstanceOf[UnsafeSorterIterator]) + val iterators = getFieldValue(chainedIterator, "iterators") + iterators.asInstanceOf[util.Queue[UnsafeSorterIterator]].asScala + .foreach(checkUnsafeSorterSpillReaderClosed) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org