This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 61aa65d9f89 [SPARK-38847][CORE][SQL][SS] Introduce a `viewToSeq` 
function for `KVUtils` to close `KVStoreIterator` in time
61aa65d9f89 is described below

commit 61aa65d9f897f29813cbbc77b6d0dbad770c8954
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Apr 12 09:25:03 2022 -0500

    [SPARK-38847][CORE][SQL][SS] Introduce a `viewToSeq` function for `KVUtils` 
to close `KVStoreIterator` in time
    
    ### What changes were proposed in this pull request?
    There are many codes in spark that convert `KVStoreView` into scala `List`, 
and these codes will not close `KVStoreIterator`, these resources are mainly 
recycled by `finalize()` method implemented in `LevelDB` and `RockSB`, this 
makes `KVStoreIterator` resource recycling unpredictable.
    
    This pr introduce a `viewToSeq` function for `KVUtils`, this function will 
convert all data in the `KVStoreView` into scala `List` and close 
`KVStoreIterator` in time.
    
    ### Why are the changes needed?
    Add a general function to convert `KVStoreView` into scala `List` and close 
`KVStoreIterator` in time.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA
    
    Closes #36132 from LuciferYang/kvutils-viewToSeq.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 26 +++++++---------------
 .../deploy/history/HistoryServerDiskManager.scala  |  8 +++----
 .../apache/spark/status/AppStatusListener.scala    | 10 ++++-----
 .../org/apache/spark/status/AppStatusStore.scala   |  2 +-
 .../scala/org/apache/spark/status/KVUtils.scala    |  8 +++++++
 .../deploy/history/FsHistoryProviderSuite.scala    | 10 ++++-----
 .../spark/status/AppStatusListenerSuite.scala      | 23 +++++++++----------
 .../spark/sql/execution/ui/SQLAppStatusStore.scala |  6 ++---
 .../execution/ui/StreamingQueryStatusStore.scala   |  6 ++---
 .../ui/StreamingQueryStatusListener.scala          |  2 +-
 .../ui/HiveThriftServer2AppStatusStore.scala       |  5 +++--
 11 files changed, 50 insertions(+), 56 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a9adaed374a..dddb7da617f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -592,11 +592,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       // Only entries with valid applications are cleaned up here. Cleaning up 
invalid log
       // files is done by the periodic cleaner task.
       val stale = listing.synchronized {
-        listing.view(classOf[LogInfo])
+        KVUtils.viewToSeq(listing.view(classOf[LogInfo])
           .index("lastProcessed")
-          .last(newLastScanTime - 1)
-          .asScala
-          .toList
+          .last(newLastScanTime - 1))
       }
       stale.filterNot(isProcessing)
         .filterNot(info => notStale.contains(info.logPath))
@@ -957,12 +955,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
     val maxNum = conf.get(MAX_LOG_NUM)
 
-    val expired = listing.view(classOf[ApplicationInfoWrapper])
+    val expired = 
KVUtils.viewToSeq(listing.view(classOf[ApplicationInfoWrapper])
       .index("oldestAttempt")
       .reverse()
-      .first(maxTime)
-      .asScala
-      .toList
+      .first(maxTime))
     expired.foreach { app =>
       // Applications may have multiple attempts, some of which may not need 
to be deleted yet.
       val (remaining, toDelete) = app.attempts.partition { attempt =>
@@ -972,13 +968,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
 
     // Delete log files that don't have a valid application and exceed the 
configured max age.
-    val stale = listing.view(classOf[LogInfo])
+    val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])
       .index("lastProcessed")
       .reverse()
-      .first(maxTime)
-      .asScala
-      .filter { l => l.logType == null || l.logType == LogType.EventLogs }
-      .toList
+      .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == 
LogType.EventLogs }
     stale.filterNot(isProcessing).foreach { log =>
       if (log.appId.isEmpty) {
         logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
@@ -1080,13 +1073,10 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
     // Delete driver log file entries that exceed the configured max age and
     // may have been deleted on filesystem externally.
-    val stale = listing.view(classOf[LogInfo])
+    val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])
       .index("lastProcessed")
       .reverse()
-      .first(maxTime)
-      .asScala
-      .filter { l => l.logType != null && l.logType == LogType.DriverLogs }
-      .toList
+      .first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == 
LogType.DriverLogs }
     stale.filterNot(isProcessing).foreach { log =>
       logInfo(s"Deleting invalid driver log ${log.logPath}")
       listing.delete(classOf[LogInfo], log.logPath)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 72d407d8643..6759d890d0d 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy.history
 import java.io.File
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.commons.io.FileUtils
@@ -29,6 +28,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.History.HybridStoreDiskBackend.LEVELDB
+import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.util.{Clock, Utils}
 import org.apache.spark.util.kvstore.KVStore
@@ -78,10 +78,8 @@ private class HistoryServerDiskManager(
 
     // Go through the recorded store directories and remove any that may have 
been removed by
     // external code.
-    val (existences, orphans) = listing
-      .view(classOf[ApplicationStoreInfo])
-      .asScala
-      .toSeq
+    val (existences, orphans) = KVUtils.viewToSeq(listing
+      .view(classOf[ApplicationStoreInfo]))
       .partition { info =>
         new File(info.path).exists()
       }
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 35c43b06c28..06008988947 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -1249,8 +1249,8 @@ private[spark] class AppStatusListener(
 
     if (dead > threshold) {
       val countToDelete = calculateNumberToRemove(dead, threshold)
-      val toDelete = 
kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
-        .max(countToDelete).first(false).last(false).asScala.toSeq
+      val toDelete = 
KVUtils.viewToSeq(kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
+        .max(countToDelete).first(false).last(false))
       toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
     }
   }
@@ -1406,12 +1406,10 @@ private[spark] class AppStatusListener(
   }
 
   private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = {
-    val cachedQuantiles = kvstore.view(classOf[CachedQuantile])
+    val cachedQuantiles = 
KVUtils.viewToSeq(kvstore.view(classOf[CachedQuantile])
       .index("stage")
       .first(stageKey)
-      .last(stageKey)
-      .asScala
-      .toList
+      .last(stageKey))
     cachedQuantiles.foreach { q =>
       kvstore.delete(q.getClass(), q.id)
     }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 03767ee83a9..34155e3e330 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -691,7 +691,7 @@ private[spark] class AppStatusStore(
   }
 
   def streamBlocksList(): Seq[StreamBlockData] = {
-    store.view(classOf[StreamBlockData]).asScala.toSeq
+    KVUtils.viewToSeq(store.view(classOf[StreamBlockData]))
   }
 
   def operationGraphForStage(stageId: Int): RDDOperationGraph = {
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala 
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index 7a4b613ac06..e422bf3c05a 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
 import org.apache.spark.internal.config.History.HybridStoreDiskBackend
 import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
+import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore._
 
 private[spark] object KVUtils extends Logging {
@@ -92,6 +93,13 @@ private[spark] object KVUtils extends Logging {
     }
   }
 
+  /** Turns a KVStoreView into a Scala sequence. */
+  def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.toList
+    }
+  }
+
   private[spark] class MetadataMismatchException extends Exception
 
 }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index b05b9de68dc..398a220ebd1 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -23,7 +23,6 @@ import java.util.{Date, Locale}
 import java.util.concurrent.TimeUnit
 import java.util.zip.{ZipInputStream, ZipOutputStream}
 
-import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
 import com.google.common.io.{ByteStreams, Files}
@@ -50,6 +49,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.security.GroupMappingServiceProvider
 import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils.KVStoreScalaSerializer
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
 import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
@@ -684,12 +684,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
     log3.setLastModified(clock.getTimeMillis())
     // This should not trigger any cleanup
     provider.cleanDriverLogs()
-    provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(3)
+    KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should 
be(3)
 
     // Should trigger cleanup for first file but not second one
     clock.setTime(firstFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) + 
1)
     provider.cleanDriverLogs()
-    provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(2)
+    KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should 
be(2)
     assert(!log1.exists())
     assert(log2.exists())
     assert(log3.exists())
@@ -700,7 +700,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
     // Should cleanup the second file but not the third file, as filelength 
changed.
     clock.setTime(secondFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) + 
1)
     provider.cleanDriverLogs()
-    provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(1)
+    KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should 
be(1)
     assert(!log1.exists())
     assert(!log2.exists())
     assert(log3.exists())
@@ -708,7 +708,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
     // Should cleanup the third file as well.
     clock.setTime(secondFileModifiedTime + 2 * 
TimeUnit.SECONDS.toMillis(maxAge) + 2)
     provider.cleanDriverLogs()
-    provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(0)
+    KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should 
be(0)
     assert(!log3.exists())
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 105b447f9c7..f5ccce7f1d9 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -254,8 +254,8 @@ abstract class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter
         assert(stage.info.memoryBytesSpilled === s1Tasks.size * value)
       }
 
-      val execs = 
store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
-        .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
+      val execs = 
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+        .first(key(stages.head)).last(key(stages.head)))
       assert(execs.size > 0)
       execs.foreach { exec =>
         assert(exec.info.memoryBytesSpilled === s1Tasks.size * value / 2)
@@ -272,10 +272,9 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
       stageAttemptId = stages.head.attemptNumber))
 
     val executorStageSummaryWrappers =
-      store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+      
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
         .first(key(stages.head))
-        .last(key(stages.head))
-        .asScala.toSeq
+        .last(key(stages.head)))
 
     assert(executorStageSummaryWrappers.nonEmpty)
     executorStageSummaryWrappers.foreach { exec =>
@@ -301,10 +300,9 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
       stageAttemptId = stages.head.attemptNumber))
 
     val executorStageSummaryWrappersForNode =
-      store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+      
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
         .first(key(stages.head))
-        .last(key(stages.head))
-        .asScala.toSeq
+        .last(key(stages.head)))
 
     assert(executorStageSummaryWrappersForNode.nonEmpty)
     executorStageSummaryWrappersForNode.foreach { exec =>
@@ -1364,13 +1362,13 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
         TaskKilled(reason = "Killed"), tasks(1), new ExecutorMetrics, null))
 
     // Ensure killed task metrics are updated
-    val allStages = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+    val allStages = 
KVUtils.viewToSeq(store.view(classOf[StageDataWrapper]).reverse()).map(_.info)
     val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
     assert(failedStages.size == 1)
     assert(failedStages.head.numKilledTasks == 1)
     assert(failedStages.head.numCompleteTasks == 1)
 
-    val allJobs = 
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+    val allJobs = 
KVUtils.viewToSeq(store.view(classOf[JobDataWrapper]).reverse()).map(_.info)
     assert(allJobs.size == 1)
     assert(allJobs.head.numKilledTasks == 1)
     assert(allJobs.head.numCompletedTasks == 1)
@@ -1427,14 +1425,15 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
         ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), new 
ExecutorMetrics,
         null))
 
-      val esummary = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+      val esummary = 
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper])).map(_.info)
       esummary.foreach { execSummary =>
         assert(execSummary.failedTasks === 1)
         assert(execSummary.succeededTasks === 1)
         assert(execSummary.killedTasks === 0)
       }
 
-      val allExecutorSummary = 
store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
+      val allExecutorSummary =
+        
KVUtils.viewToSeq(store.view(classOf[ExecutorSummaryWrapper])).map(_.info)
       assert(allExecutorSummary.size === 2)
       allExecutorSummary.foreach { allExecSummary =>
         assert(allExecSummary.failedTasks === 1)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 7c3315e3d76..95035c08a2c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.ui
 import java.lang.{Long => JLong}
 import java.util.Date
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.annotation.JsonIgnore
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils.KVIndexParam
 import org.apache.spark.util.kvstore.{KVIndex, KVStore}
 
@@ -39,11 +39,11 @@ class SQLAppStatusStore(
     val listener: Option[SQLAppStatusListener] = None) {
 
   def executionsList(): Seq[SQLExecutionUIData] = {
-    store.view(classOf[SQLExecutionUIData]).asScala.toSeq
+    KVUtils.viewToSeq(store.view(classOf[SQLExecutionUIData]))
   }
 
   def executionsList(offset: Int, length: Int): Seq[SQLExecutionUIData] = {
-    
store.view(classOf[SQLExecutionUIData]).skip(offset).max(length).asScala.toSeq
+    
KVUtils.viewToSeq(store.view(classOf[SQLExecutionUIData]).skip(offset).max(length))
   }
 
   def execution(executionId: Long): Option[SQLExecutionUIData] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
index 6a3b4eeb672..b6d27008b85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
@@ -32,21 +32,21 @@ class StreamingQueryStatusStore(store: KVStore) {
 
   def allQueryUIData: Seq[StreamingQueryUIData] = {
     val view = 
store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L)
-    KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData)
+    KVUtils.viewToSeq(view).map(makeUIData)
   }
 
   // visible for test
   private[sql] def getQueryProgressData(runId: UUID): 
Seq[StreamingQueryProgressWrapper] = {
     val view = store.view(classOf[StreamingQueryProgressWrapper])
       .index("runId").first(runId.toString).last(runId.toString)
-    KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+    KVUtils.viewToSeq(view)
   }
 
   private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = {
     val runId = summary.runId
     val view = store.view(classOf[StreamingQueryProgressWrapper])
       .index("runId").first(runId).last(runId)
-    val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+    val recentProgress = KVUtils.viewToSeq(view)
       .map(_.progress).sortBy(_.timestamp).toArray
     StreamingQueryUIData(summary, recentProgress)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 55ceab245a9..2e6102b01fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -55,7 +55,7 @@ private[sql] class StreamingQueryStatusListener(
 
   private def cleanupInactiveQueries(count: Long): Unit = {
     val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
-    val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+    val inactiveQueries = KVUtils.viewToSeq(view)
     val numInactiveQueries = inactiveQueries.size
     if (numInactiveQueries <= inactiveQueryStatusRetention) {
       return
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
index 54809fe6c80..92c7feaf646 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
+import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils.KVIndexParam
 import org.apache.spark.util.kvstore.{KVIndex, KVStore}
 
@@ -32,11 +33,11 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore}
 class HiveThriftServer2AppStatusStore(store: KVStore) {
 
   def getSessionList: Seq[SessionInfo] = {
-    store.view(classOf[SessionInfo]).asScala.toSeq
+    KVUtils.viewToSeq(store.view(classOf[SessionInfo]))
   }
 
   def getExecutionList: Seq[ExecutionInfo] = {
-    store.view(classOf[ExecutionInfo]).asScala.toSeq
+    KVUtils.viewToSeq(store.view(classOf[ExecutionInfo]))
   }
 
   def getOnlineSessionNum: Int = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to