This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 61aa65d9f89 [SPARK-38847][CORE][SQL][SS] Introduce a `viewToSeq` function for `KVUtils` to close `KVStoreIterator` in time 61aa65d9f89 is described below commit 61aa65d9f897f29813cbbc77b6d0dbad770c8954 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Apr 12 09:25:03 2022 -0500 [SPARK-38847][CORE][SQL][SS] Introduce a `viewToSeq` function for `KVUtils` to close `KVStoreIterator` in time ### What changes were proposed in this pull request? There are many codes in spark that convert `KVStoreView` into scala `List`, and these codes will not close `KVStoreIterator`, these resources are mainly recycled by `finalize()` method implemented in `LevelDB` and `RockSB`, this makes `KVStoreIterator` resource recycling unpredictable. This pr introduce a `viewToSeq` function for `KVUtils`, this function will convert all data in the `KVStoreView` into scala `List` and close `KVStoreIterator` in time. ### Why are the changes needed? Add a general function to convert `KVStoreView` into scala `List` and close `KVStoreIterator` in time. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #36132 from LuciferYang/kvutils-viewToSeq. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/deploy/history/FsHistoryProvider.scala | 26 +++++++--------------- .../deploy/history/HistoryServerDiskManager.scala | 8 +++---- .../apache/spark/status/AppStatusListener.scala | 10 ++++----- .../org/apache/spark/status/AppStatusStore.scala | 2 +- .../scala/org/apache/spark/status/KVUtils.scala | 8 +++++++ .../deploy/history/FsHistoryProviderSuite.scala | 10 ++++----- .../spark/status/AppStatusListenerSuite.scala | 23 +++++++++---------- .../spark/sql/execution/ui/SQLAppStatusStore.scala | 6 ++--- .../execution/ui/StreamingQueryStatusStore.scala | 6 ++--- .../ui/StreamingQueryStatusListener.scala | 2 +- .../ui/HiveThriftServer2AppStatusStore.scala | 5 +++-- 11 files changed, 50 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a9adaed374a..dddb7da617f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -592,11 +592,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Only entries with valid applications are cleaned up here. Cleaning up invalid log // files is done by the periodic cleaner task. val stale = listing.synchronized { - listing.view(classOf[LogInfo]) + KVUtils.viewToSeq(listing.view(classOf[LogInfo]) .index("lastProcessed") - .last(newLastScanTime - 1) - .asScala - .toList + .last(newLastScanTime - 1)) } stale.filterNot(isProcessing) .filterNot(info => notStale.contains(info.logPath)) @@ -957,12 +955,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 val maxNum = conf.get(MAX_LOG_NUM) - val expired = listing.view(classOf[ApplicationInfoWrapper]) + val expired = KVUtils.viewToSeq(listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") .reverse() - .first(maxTime) - .asScala - .toList + .first(maxTime)) expired.foreach { app => // Applications may have multiple attempts, some of which may not need to be deleted yet. val (remaining, toDelete) = app.attempts.partition { attempt => @@ -972,13 +968,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Delete log files that don't have a valid application and exceed the configured max age. - val stale = listing.view(classOf[LogInfo]) + val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo]) .index("lastProcessed") .reverse() - .first(maxTime) - .asScala - .filter { l => l.logType == null || l.logType == LogType.EventLogs } - .toList + .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs } stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") @@ -1080,13 +1073,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Delete driver log file entries that exceed the configured max age and // may have been deleted on filesystem externally. - val stale = listing.view(classOf[LogInfo]) + val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo]) .index("lastProcessed") .reverse() - .first(maxTime) - .asScala - .filter { l => l.logType != null && l.logType == LogType.DriverLogs } - .toList + .first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == LogType.DriverLogs } stale.filterNot(isProcessing).foreach { log => logInfo(s"Deleting invalid driver log ${log.logPath}") listing.delete(classOf[LogInfo], log.logPath) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index 72d407d8643..6759d890d0d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy.history import java.io.File import java.util.concurrent.atomic.AtomicLong -import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.commons.io.FileUtils @@ -29,6 +28,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.History.HybridStoreDiskBackend.LEVELDB +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils._ import org.apache.spark.util.{Clock, Utils} import org.apache.spark.util.kvstore.KVStore @@ -78,10 +78,8 @@ private class HistoryServerDiskManager( // Go through the recorded store directories and remove any that may have been removed by // external code. - val (existences, orphans) = listing - .view(classOf[ApplicationStoreInfo]) - .asScala - .toSeq + val (existences, orphans) = KVUtils.viewToSeq(listing + .view(classOf[ApplicationStoreInfo])) .partition { info => new File(info.path).exists() } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 35c43b06c28..06008988947 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1249,8 +1249,8 @@ private[spark] class AppStatusListener( if (dead > threshold) { val countToDelete = calculateNumberToRemove(dead, threshold) - val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") - .max(countToDelete).first(false).last(false).asScala.toSeq + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") + .max(countToDelete).first(false).last(false)) toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } } } @@ -1406,12 +1406,10 @@ private[spark] class AppStatusListener( } private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = { - val cachedQuantiles = kvstore.view(classOf[CachedQuantile]) + val cachedQuantiles = KVUtils.viewToSeq(kvstore.view(classOf[CachedQuantile]) .index("stage") .first(stageKey) - .last(stageKey) - .asScala - .toList + .last(stageKey)) cachedQuantiles.foreach { q => kvstore.delete(q.getClass(), q.id) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 03767ee83a9..34155e3e330 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -691,7 +691,7 @@ private[spark] class AppStatusStore( } def streamBlocksList(): Seq[StreamBlockData] = { - store.view(classOf[StreamBlockData]).asScala.toSeq + KVUtils.viewToSeq(store.view(classOf[StreamBlockData])) } def operationGraphForStage(stageId: Int): RDDOperationGraph = { diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 7a4b613ac06..e422bf3c05a 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND import org.apache.spark.internal.config.History.HybridStoreDiskBackend import org.apache.spark.internal.config.History.HybridStoreDiskBackend._ +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore._ private[spark] object KVUtils extends Logging { @@ -92,6 +93,13 @@ private[spark] object KVUtils extends Logging { } } + /** Turns a KVStoreView into a Scala sequence. */ + def viewToSeq[T](view: KVStoreView[T]): Seq[T] = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.toList + } + } + private[spark] class MetadataMismatchException extends Exception } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index b05b9de68dc..398a220ebd1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -23,7 +23,6 @@ import java.util.{Date, Locale} import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} -import scala.collection.JavaConverters._ import scala.concurrent.duration._ import com.google.common.io.{ByteStreams, Files} @@ -50,6 +49,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.security.GroupMappingServiceProvider import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} @@ -684,12 +684,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { log3.setLastModified(clock.getTimeMillis()) // This should not trigger any cleanup provider.cleanDriverLogs() - provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(3) + KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should be(3) // Should trigger cleanup for first file but not second one clock.setTime(firstFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) + 1) provider.cleanDriverLogs() - provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(2) + KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should be(2) assert(!log1.exists()) assert(log2.exists()) assert(log3.exists()) @@ -700,7 +700,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Should cleanup the second file but not the third file, as filelength changed. clock.setTime(secondFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) + 1) provider.cleanDriverLogs() - provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(1) + KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should be(1) assert(!log1.exists()) assert(!log2.exists()) assert(log3.exists()) @@ -708,7 +708,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Should cleanup the third file as well. clock.setTime(secondFileModifiedTime + 2 * TimeUnit.SECONDS.toMillis(maxAge) + 2) provider.cleanDriverLogs() - provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(0) + KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should be(0) assert(!log3.exists()) } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 105b447f9c7..f5ccce7f1d9 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -254,8 +254,8 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(stage.info.memoryBytesSpilled === s1Tasks.size * value) } - val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") - .first(key(stages.head)).last(key(stages.head)).asScala.toSeq + val execs = KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") + .first(key(stages.head)).last(key(stages.head))) assert(execs.size > 0) execs.foreach { exec => assert(exec.info.memoryBytesSpilled === s1Tasks.size * value / 2) @@ -272,10 +272,9 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter stageAttemptId = stages.head.attemptNumber)) val executorStageSummaryWrappers = - store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") + KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") .first(key(stages.head)) - .last(key(stages.head)) - .asScala.toSeq + .last(key(stages.head))) assert(executorStageSummaryWrappers.nonEmpty) executorStageSummaryWrappers.foreach { exec => @@ -301,10 +300,9 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter stageAttemptId = stages.head.attemptNumber)) val executorStageSummaryWrappersForNode = - store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") + KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") .first(key(stages.head)) - .last(key(stages.head)) - .asScala.toSeq + .last(key(stages.head))) assert(executorStageSummaryWrappersForNode.nonEmpty) executorStageSummaryWrappersForNode.foreach { exec => @@ -1364,13 +1362,13 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter TaskKilled(reason = "Killed"), tasks(1), new ExecutorMetrics, null)) // Ensure killed task metrics are updated - val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info) + val allStages = KVUtils.viewToSeq(store.view(classOf[StageDataWrapper]).reverse()).map(_.info) val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED) assert(failedStages.size == 1) assert(failedStages.head.numKilledTasks == 1) assert(failedStages.head.numCompleteTasks == 1) - val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info) + val allJobs = KVUtils.viewToSeq(store.view(classOf[JobDataWrapper]).reverse()).map(_.info) assert(allJobs.size == 1) assert(allJobs.head.numKilledTasks == 1) assert(allJobs.head.numCompletedTasks == 1) @@ -1427,14 +1425,15 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), new ExecutorMetrics, null)) - val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) + val esummary = KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper])).map(_.info) esummary.foreach { execSummary => assert(execSummary.failedTasks === 1) assert(execSummary.succeededTasks === 1) assert(execSummary.killedTasks === 0) } - val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info) + val allExecutorSummary = + KVUtils.viewToSeq(store.view(classOf[ExecutorSummaryWrapper])).map(_.info) assert(allExecutorSummary.size === 2) allExecutorSummary.foreach { allExecSummary => assert(allExecSummary.failedTasks === 1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 7c3315e3d76..95035c08a2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.ui import java.lang.{Long => JLong} import java.util.Date -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.util.kvstore.{KVIndex, KVStore} @@ -39,11 +39,11 @@ class SQLAppStatusStore( val listener: Option[SQLAppStatusListener] = None) { def executionsList(): Seq[SQLExecutionUIData] = { - store.view(classOf[SQLExecutionUIData]).asScala.toSeq + KVUtils.viewToSeq(store.view(classOf[SQLExecutionUIData])) } def executionsList(offset: Int, length: Int): Seq[SQLExecutionUIData] = { - store.view(classOf[SQLExecutionUIData]).skip(offset).max(length).asScala.toSeq + KVUtils.viewToSeq(store.view(classOf[SQLExecutionUIData]).skip(offset).max(length)) } def execution(executionId: Long): Option[SQLExecutionUIData] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala index 6a3b4eeb672..b6d27008b85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala @@ -32,21 +32,21 @@ class StreamingQueryStatusStore(store: KVStore) { def allQueryUIData: Seq[StreamingQueryUIData] = { val view = store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L) - KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData) + KVUtils.viewToSeq(view).map(makeUIData) } // visible for test private[sql] def getQueryProgressData(runId: UUID): Seq[StreamingQueryProgressWrapper] = { val view = store.view(classOf[StreamingQueryProgressWrapper]) .index("runId").first(runId.toString).last(runId.toString) - KVUtils.viewToSeq(view, Int.MaxValue)(_ => true) + KVUtils.viewToSeq(view) } private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = { val runId = summary.runId val view = store.view(classOf[StreamingQueryProgressWrapper]) .index("runId").first(runId).last(runId) - val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true) + val recentProgress = KVUtils.viewToSeq(view) .map(_.progress).sortBy(_.timestamp).toArray StreamingQueryUIData(summary, recentProgress) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 55ceab245a9..2e6102b01fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -55,7 +55,7 @@ private[sql] class StreamingQueryStatusListener( private def cleanupInactiveQueries(count: Long): Unit = { val view = store.view(classOf[StreamingQueryData]).index("active").first(false).last(false) - val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true) + val inactiveQueries = KVUtils.viewToSeq(view) val numInactiveQueries = inactiveQueries.size if (numInactiveQueries <= inactiveQueryStatusRetention) { return diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala index 54809fe6c80..92c7feaf646 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.util.kvstore.{KVIndex, KVStore} @@ -32,11 +33,11 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore} class HiveThriftServer2AppStatusStore(store: KVStore) { def getSessionList: Seq[SessionInfo] = { - store.view(classOf[SessionInfo]).asScala.toSeq + KVUtils.viewToSeq(store.view(classOf[SessionInfo])) } def getExecutionList: Seq[ExecutionInfo] = { - store.view(classOf[ExecutionInfo]).asScala.toSeq + KVUtils.viewToSeq(store.view(classOf[ExecutionInfo])) } def getOnlineSessionNum: Int = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org