This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fba68e6f20b [SPARK-38896][CORE][SQL] Use `tryWithResource` release 
`LevelDB/RocksDBIterator` resources earlier
fba68e6f20b is described below

commit fba68e6f20b88779dc5ca78742958952b3d7acf0
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Thu Apr 28 18:51:28 2022 -0500

    [SPARK-38896][CORE][SQL] Use `tryWithResource` release 
`LevelDB/RocksDBIterator` resources earlier
    
    ### What changes were proposed in this pull request?
    Similar to SPARK-38847, this pr aims to release the 
`LevelDB/RocksDBIterator` resources earlier by using `tryWithResource`. The 
main change of this pr as follows:
    
    1. Use Java `tryWithResource` and Spark `Utils.tryWithResource` to 
recycling `KVStoreIterator` opened by `RocksDB.view(Class<T> type).iterator` 
and `RocksDB.view(Class<T> type).iterator`
    2. Introduce 4 new function for KVUtils(`count|foreach|mapToSeq|size`),  
these function will close `KVStoreIterator` in time.
    
    ### Why are the changes needed?
    Release the `LevelDB/RocksDBIterator` resources earlier
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA
    
    Closes #36237 from LuciferYang/Manual-Close-KVStoreIterator.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../org/apache/spark/util/kvstore/LevelDB.java     | 12 +++--
 .../org/apache/spark/util/kvstore/RocksDB.java     | 12 +++--
 .../apache/spark/util/kvstore/DBIteratorSuite.java |  6 ++-
 .../spark/util/kvstore/LevelDBBenchmark.java       | 12 +++--
 .../apache/spark/util/kvstore/LevelDBSuite.java    | 29 ++++++-----
 .../spark/util/kvstore/RocksDBBenchmark.java       | 13 +++--
 .../apache/spark/util/kvstore/RocksDBSuite.java    | 29 ++++++-----
 .../spark/deploy/history/FsHistoryProvider.scala   | 15 ++----
 .../apache/spark/status/AppStatusListener.scala    |  2 +-
 .../org/apache/spark/status/AppStatusStore.scala   | 58 ++++++++++++----------
 .../scala/org/apache/spark/status/KVUtils.scala    | 37 ++++++++++++++
 .../spark/deploy/history/HybridStoreSuite.scala    | 18 +++++--
 .../spark/status/AppStatusListenerSuite.scala      |  3 +-
 .../spark/sql/diagnostic/DiagnosticStore.scala     | 10 ++--
 .../ui/HiveThriftServer2AppStatusStore.scala       |  8 +--
 .../ui/HiveThriftServer2Listener.scala             |  3 +-
 16 files changed, 170 insertions(+), 97 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 6b28373a480..b50906e2cba 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -270,10 +270,14 @@ public class LevelDB implements KVStore {
     KVStoreView<T> view = view(klass).index(index);
 
     for (Object indexValue : indexValues) {
-      for (T value: view.first(indexValue).last(indexValue)) {
-        Object itemKey = naturalIndex.getValue(value);
-        delete(klass, itemKey);
-        removed = true;
+      try (KVStoreIterator<T> iterator =
+        view.first(indexValue).last(indexValue).closeableIterator()) {
+        while (iterator.hasNext()) {
+          T value = iterator.next();
+          Object itemKey = naturalIndex.getValue(value);
+          delete(klass, itemKey);
+          removed = true;
+        }
       }
     }
 
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
index 7674bc52dc7..d328e5c79d3 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
@@ -303,10 +303,14 @@ public class RocksDB implements KVStore {
     KVStoreView<T> view = view(klass).index(index);
 
     for (Object indexValue : indexValues) {
-      for (T value: view.first(indexValue).last(indexValue)) {
-        Object itemKey = naturalIndex.getValue(value);
-        delete(klass, itemKey);
-        removed = true;
+      try (KVStoreIterator<T> iterator =
+        view.first(indexValue).last(indexValue).closeableIterator()) {
+        while (iterator.hasNext()) {
+          T value = iterator.next();
+          Object itemKey = naturalIndex.getValue(value);
+          delete(klass, itemKey);
+          removed = true;
+        }
       }
     }
 
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
index ab1e2728585..223f3f93a87 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
@@ -490,11 +490,15 @@ public abstract class DBIteratorSuite {
   }
 
   private KVStoreView<CustomType1> view() throws Exception {
+    // SPARK-38896: this `view` will be closed in
+    // the `collect(KVStoreView<CustomType1> view)` method.
     return db.view(CustomType1.class);
   }
 
   private List<CustomType1> collect(KVStoreView<CustomType1> view) throws 
Exception {
-    return Arrays.asList(Iterables.toArray(view, CustomType1.class));
+    try (KVStoreIterator<CustomType1> iterator = view.closeableIterator()) {
+      return Lists.newArrayList(iterator);
+    }
   }
 
   private List<CustomType1> sortBy(Comparator<CustomType1> comp) {
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java
index f2a91f916a3..9082e1887bf 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java
@@ -197,9 +197,15 @@ public class LevelDBBenchmark {
       }
     }
 
-    while (it.hasNext()) {
-      try(Timer.Context ctx = iter.time()) {
-        it.next();
+    try {
+      while (it.hasNext()) {
+        try (Timer.Context ctx = iter.time()) {
+          it.next();
+        }
+      }
+    } finally {
+      if (it != null) {
+        it.close();
       }
     }
   }
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index a7a2148c02d..ef0ccd4a639 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Spliterators;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -236,13 +237,14 @@ public class LevelDBSuite {
       db.write(createCustomType1(i));
     }
 
-    KVStoreIterator<CustomType1> it = 
db.view(CustomType1.class).closeableIterator();
-    assertTrue(it.hasNext());
-    assertTrue(it.skip(5));
-    assertEquals("key5", it.next().key);
-    assertTrue(it.skip(3));
-    assertEquals("key9", it.next().key);
-    assertFalse(it.hasNext());
+    try (KVStoreIterator<CustomType1> it = 
db.view(CustomType1.class).closeableIterator()) {
+      assertTrue(it.hasNext());
+      assertTrue(it.skip(5));
+      assertEquals("key5", it.next().key);
+      assertTrue(it.skip(3));
+      assertEquals("key9", it.next().key);
+      assertFalse(it.hasNext());
+    }
   }
 
   @Test
@@ -257,12 +259,15 @@ public class LevelDBSuite {
       }
     });
 
-    List<Integer> results = StreamSupport
-      .stream(db.view(CustomType1.class).index("int").spliterator(), false)
-      .map(e -> e.num)
-      .collect(Collectors.toList());
+    try (KVStoreIterator<CustomType1> iterator =
+      db.view(CustomType1.class).index("int").closeableIterator()) {
+      List<Integer> results = StreamSupport
+        .stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
+        .map(e -> e.num)
+        .collect(Collectors.toList());
 
-    assertEquals(expected, results);
+      assertEquals(expected, results);
+    }
   }
 
   @Test
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java
index 4517a47b32f..25930bb1013 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java
@@ -196,10 +196,15 @@ public class RocksDBBenchmark {
         }
       }
     }
-
-    while (it.hasNext()) {
-      try(Timer.Context ctx = iter.time()) {
-        it.next();
+    try {
+      while (it.hasNext()) {
+        try (Timer.Context ctx = iter.time()) {
+          it.next();
+        }
+      }
+    } finally {
+      if (it != null) {
+        it.close();
       }
     }
   }
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
index 8112cbf04b3..a3ac40efdfb 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Spliterators;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -234,13 +235,14 @@ public class RocksDBSuite {
       db.write(createCustomType1(i));
     }
 
-    KVStoreIterator<CustomType1> it = 
db.view(CustomType1.class).closeableIterator();
-    assertTrue(it.hasNext());
-    assertTrue(it.skip(5));
-    assertEquals("key5", it.next().key);
-    assertTrue(it.skip(3));
-    assertEquals("key9", it.next().key);
-    assertFalse(it.hasNext());
+    try (KVStoreIterator<CustomType1> it = 
db.view(CustomType1.class).closeableIterator()) {
+      assertTrue(it.hasNext());
+      assertTrue(it.skip(5));
+      assertEquals("key5", it.next().key);
+      assertTrue(it.skip(3));
+      assertEquals("key9", it.next().key);
+      assertFalse(it.hasNext());
+    }
   }
 
   @Test
@@ -255,12 +257,15 @@ public class RocksDBSuite {
       }
     });
 
-    List<Integer> results = StreamSupport
-      .stream(db.view(CustomType1.class).index("int").spliterator(), false)
-      .map(e -> e.num)
-      .collect(Collectors.toList());
+    try (KVStoreIterator<CustomType1> iterator =
+      db.view(CustomType1.class).index("int").closeableIterator()) {
+      List<Integer> results = StreamSupport
+        .stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
+        .map(e -> e.num)
+        .collect(Collectors.toList());
 
-    assertEquals(expected, results);
+      assertEquals(expected, results);
+    }
   }
 
   @Test
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index dddb7da617f..e1b4104eb77 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -325,12 +325,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   override def getListing(): Iterator[ApplicationInfo] = {
     // Return the listing in end time descending order.
-    listing.view(classOf[ApplicationInfoWrapper])
-      .index("endTime")
-      .reverse()
-      .iterator()
-      .asScala
-      .map(_.toApplicationInfo())
+    KVUtils.mapToSeq(listing.view(classOf[ApplicationInfoWrapper])
+      .index("endTime").reverse())(_.toApplicationInfo()).iterator
   }
 
   override def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
@@ -982,14 +978,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
     // If the number of files is bigger than MAX_LOG_NUM,
     // clean up all completed attempts per application one by one.
-    val num = 
listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size
+    val num = 
KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed"))
     var count = num - maxNum
     if (count > 0) {
       logInfo(s"Try to delete $count old event logs to keep $maxNum logs in 
total.")
-      val oldAttempts = listing.view(classOf[ApplicationInfoWrapper])
-        .index("oldestAttempt")
-        .asScala
-      oldAttempts.foreach { app =>
+      
KVUtils.foreach(listing.view(classOf[ApplicationInfoWrapper]).index("oldestAttempt"))
 { app =>
         if (count > 0) {
           // Applications may have multiple attempts, some of which may not be 
completed yet.
           val (toDelete, remaining) = app.attempts.partition(_.info.completed)
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 06008988947..add9862c306 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -1276,7 +1276,7 @@ private[spark] class AppStatusListener(
   private def cleanupStagesWithInMemoryStore(countToDelete: Long): 
Seq[Array[Int]] = {
     val stageArray = new ArrayBuffer[StageCompletionTime]()
     val stageDataCount = new mutable.HashMap[Int, Int]()
-    kvstore.view(classOf[StageDataWrapper]).forEach { s =>
+    KVUtils.foreach(kvstore.view(classOf[StageDataWrapper])) { s =>
       // Here we keep track of the total number of StageDataWrapper entries 
for each stage id.
       // This will be used in cleaning up the RDDOperationGraphWrapper data.
       if (stageDataCount.contains(s.info.stageId)) {
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index b455850d609..4c0ac5e3192 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -65,15 +65,15 @@ private[spark] class AppStatusStore(
   }
 
   def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
-    store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq
+    KVUtils.mapToSeq(store.view(classOf[ResourceProfileWrapper]))(_.rpInfo)
   }
 
   def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
-    val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+    val it = 
KVUtils.mapToSeq(store.view(classOf[JobDataWrapper]).reverse())(_.info)
     if (statuses != null && !statuses.isEmpty()) {
-      it.filter { job => statuses.contains(job.status) }.toSeq
+      it.filter { job => statuses.contains(job.status) }
     } else {
-      it.toSeq
+      it
     }
   }
 
@@ -95,9 +95,9 @@ private[spark] class AppStatusStore(
     } else {
       base
     }
-    filtered.asScala.map(_.info)
+    KVUtils.mapToSeq(filtered)(_.info)
       .filter(_.id != FALLBACK_BLOCK_MANAGER_ID.executorId)
-      .map(replaceExec).toSeq
+      .map(replaceExec)
   }
 
   private def replaceExec(origin: v1.ExecutorSummary): v1.ExecutorSummary = {
@@ -155,7 +155,7 @@ private[spark] class AppStatusStore(
     } else {
       base
     }
-    filtered.asScala.map(_.info).toSeq
+    KVUtils.mapToSeq(filtered)(_.info)
   }
 
   def executorSummary(executorId: String): v1.ExecutorSummary = {
@@ -177,12 +177,13 @@ private[spark] class AppStatusStore(
     unsortedQuantiles: Array[Double] = Array.empty,
     taskStatus: JList[v1.TaskStatus] = List().asJava): Seq[v1.StageData] = {
     val quantiles = unsortedQuantiles.sorted
-    val it = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+    val it = 
KVUtils.mapToSeq(store.view(classOf[StageDataWrapper]).reverse())(_.info)
     val ret = if (statuses != null && !statuses.isEmpty()) {
-      it.filter { s => statuses.contains(s.status) }.toSeq
+      it.filter { s => statuses.contains(s.status) }
     } else {
-      it.toSeq
+      it
     }
+
     ret.map { s =>
       newStageData(s, withDetail = details, taskStatus = taskStatus,
         withSummaries = withSummaries, unsortedQuantiles = quantiles)
@@ -195,11 +196,11 @@ private[spark] class AppStatusStore(
     taskStatus: JList[v1.TaskStatus] = List().asJava,
     withSummaries: Boolean = false,
     unsortedQuantiles: Array[Double] = Array.empty[Double]): Seq[v1.StageData] 
= {
-    
store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId)
-      .asScala.map { s =>
-        newStageData(s.info, withDetail = details, taskStatus = taskStatus,
-          withSummaries = withSummaries, unsortedQuantiles = unsortedQuantiles)
-      }.toSeq
+    KVUtils.mapToSeq(store.view(classOf[StageDataWrapper]).index("stageId")
+      .first(stageId).last(stageId)) { s =>
+      newStageData(s.info, withDetail = details, taskStatus = taskStatus,
+        withSummaries = withSummaries, unsortedQuantiles = unsortedQuantiles)
+    }
   }
 
   def lastStageAttempt(stageId: Int): v1.StageData = {
@@ -468,9 +469,9 @@ private[spark] class AppStatusStore(
 
   def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): 
Seq[v1.TaskData] = {
     val stageKey = Array(stageId, stageAttemptId)
-    val taskDataWrapperIter = 
store.view(classOf[TaskDataWrapper]).index("stage")
-      .first(stageKey).last(stageKey).reverse().max(maxTasks).asScala
-    constructTaskDataList(taskDataWrapperIter).reverse
+    val taskDataWrapperSeq = 
KVUtils.viewToSeq(store.view(classOf[TaskDataWrapper]).index("stage")
+      .first(stageKey).last(stageKey).reverse().max(maxTasks))
+    constructTaskDataList(taskDataWrapperSeq).reverse
   }
 
   def taskList(
@@ -511,20 +512,22 @@ private[spark] class AppStatusStore(
     }
 
     val ordered = if (ascending) indexed else indexed.reverse()
-    val taskDataWrapperIter = if (statuses != null && !statuses.isEmpty) {
+    val taskDataWrapperSeq = if (statuses != null && !statuses.isEmpty) {
       val statusesStr = statuses.asScala.map(_.toString).toSet
-      ordered.asScala.filter(s => 
statusesStr.contains(s.status)).slice(offset, offset + length)
+      KVUtils.viewToSeq(ordered, offset, offset + length)(s => 
statusesStr.contains(s.status))
     } else {
-      ordered.skip(offset).max(length).asScala
+      KVUtils.viewToSeq(ordered.skip(offset).max(length))
     }
 
-    constructTaskDataList(taskDataWrapperIter)
+    constructTaskDataList(taskDataWrapperSeq)
   }
 
   def executorSummary(stageId: Int, attemptId: Int): Map[String, 
v1.ExecutorStageSummary] = {
     val stageKey = Array(stageId, attemptId)
-    
store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stageKey).last(stageKey)
-      .asScala.map { exec => (exec.executorId -> exec.info) }.toMap
+    KVUtils.mapToSeq(store.view(classOf[ExecutorStageSummaryWrapper])
+      .index("stage").first(stageKey).last(stageKey)) { exec =>
+      (exec.executorId -> exec.info)
+    }.toMap
   }
 
   def speculationSummary(stageId: Int, attemptId: Int): 
Option[v1.SpeculationStageSummary] = {
@@ -533,9 +536,10 @@ private[spark] class AppStatusStore(
   }
 
   def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
-    store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).filter { 
rdd =>
-      !cachedOnly || rdd.numCachedPartitions > 0
-    }.toSeq
+    KVUtils.mapToSeq(store.view(classOf[RDDStorageInfoWrapper]))(_.info)
+      .filter { rdd =>
+        !cachedOnly || rdd.numCachedPartitions > 0
+      }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala 
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index e422bf3c05a..f21f10004b2 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -93,6 +93,16 @@ private[spark] object KVUtils extends Logging {
     }
   }
 
+  /** Turns an interval of KVStoreView into a Scala sequence, applying a 
filter. */
+  def viewToSeq[T](
+      view: KVStoreView[T],
+      from: Int,
+      until: Int)(filter: T => Boolean): Seq[T] = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.filter(filter).slice(from, until).toList
+    }
+  }
+
   /** Turns a KVStoreView into a Scala sequence. */
   def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
     Utils.tryWithResource(view.closeableIterator()) { iter =>
@@ -100,6 +110,33 @@ private[spark] object KVUtils extends Logging {
     }
   }
 
+  /** Counts the number of elements in the KVStoreView which satisfy a 
predicate. */
+  def count[T](view: KVStoreView[T])(countFunc: T => Boolean): Int = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.count(countFunc)
+    }
+  }
+
+  /** Applies a function f to all values produced by KVStoreView. */
+  def foreach[T](view: KVStoreView[T])(foreachFunc: T => Unit): Unit = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.foreach(foreachFunc)
+    }
+  }
+
+  /** Maps all values of KVStoreView to new values using a transformation 
function. */
+  def mapToSeq[T, B](view: KVStoreView[T])(mapFunc: T => B): Seq[B] = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.map(mapFunc).toList
+    }
+  }
+
+  def size[T](view: KVStoreView[T]): Int = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.size
+    }
+  }
+
   private[spark] class MetadataMismatchException extends Exception
 
 }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
index 9c06c99e8e6..555759b88e1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.tags.ExtendedLevelDBTest
+import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore._
 
 abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with 
TimeLimits {
@@ -103,6 +104,13 @@ abstract class HybridStoreSuite extends SparkFunSuite with 
BeforeAndAfter with T
   }
 
   test("test basic iteration") {
+
+    def head[T](view: KVStoreView[T]): T = {
+      Utils.tryWithResource(view.closeableIterator()) { iter =>
+        assert(iter.hasNext)
+        iter.next()
+      }
+    }
     val store = createHybridStore()
 
     val t1 = createCustomType1(1)
@@ -113,11 +121,11 @@ abstract class HybridStoreSuite extends SparkFunSuite 
with BeforeAndAfter with T
     Seq(false, true).foreach { switch =>
       if (switch) switchHybridStore(store)
 
-      assert(store.view(t1.getClass()).iterator().next().id === t1.id)
-      assert(store.view(t1.getClass()).skip(1).iterator().next().id === t2.id)
-      assert(store.view(t1.getClass()).skip(1).max(1).iterator().next().id === 
t2.id)
-      
assert(store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id === 
t1.id)
-      
assert(store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id === 
t2.id)
+      assert(head(store.view(t1.getClass)).id === t1.id)
+      assert(head(store.view(t1.getClass()).skip(1)).id === t2.id)
+      assert(head(store.view(t1.getClass()).skip(1).max(1)).id === t2.id)
+      assert(head(store.view(t1.getClass()).first(t1.key).max(1)).id === t1.id)
+      assert(head(store.view(t1.getClass()).first(t2.key).max(1)).id === t2.id)
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index f5ccce7f1d9..ec92877ce94 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.status
 import java.io.File
 import java.util.{Date, Properties}
 
-import scala.collection.JavaConverters._
 import scala.collection.immutable.Map
 import scala.reflect.{classTag, ClassTag}
 
@@ -1671,7 +1670,7 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     }
 
     // check peak executor metric values for each stage and executor
-    val stageExecSummaries = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.toSeq
+    val stageExecSummaries = 
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]))
     stageExecSummaries.foreach { exec =>
       expectedStageValues.get(exec.stageId) match {
         case Some(stageValue) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
index 236ee104f0e..c13cc8a7f39 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.sql.diagnostic
 
-import scala.collection.JavaConverters._
-
 import com.fasterxml.jackson.annotation.JsonIgnore
 
+import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils.KVIndexParam
 import org.apache.spark.util.kvstore.{KVIndex, KVStore}
 
@@ -32,7 +31,7 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore}
 class DiagnosticStore(store: KVStore) {
 
   def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] 
= {
-    
store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq
+    
KVUtils.viewToSeq(store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length))
   }
 
   def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = {
@@ -44,11 +43,10 @@ class DiagnosticStore(store: KVStore) {
   }
 
   def adaptiveExecutionUpdates(executionId: Long): 
Seq[AdaptiveExecutionUpdate] = {
+    KVUtils.viewToSeq(
     store.view(classOf[AdaptiveExecutionUpdate])
       .index("updateTime")
-      .parent(executionId)
-      .asScala
-      .toSeq
+      .parent(executionId))
   }
 }
 
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
index 92c7feaf646..58520854936 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.hive.thriftserver.ui
 
-import com.fasterxml.jackson.annotation.JsonIgnore
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
 import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils.KVIndexParam
@@ -41,7 +41,7 @@ class HiveThriftServer2AppStatusStore(store: KVStore) {
   }
 
   def getOnlineSessionNum: Int = {
-    store.view(classOf[SessionInfo]).asScala.count(_.finishTimestamp == 0)
+    KVUtils.count(store.view(classOf[SessionInfo]))(_.finishTimestamp == 0)
   }
 
   def getSession(sessionId: String): Option[SessionInfo] = {
@@ -66,7 +66,7 @@ class HiveThriftServer2AppStatusStore(store: KVStore) {
    * cancellations and count all statements that have not been closed so far.
    */
   def getTotalRunning: Int = {
-    store.view(classOf[ExecutionInfo]).asScala.count(_.isExecutionActive)
+    KVUtils.count(store.view(classOf[ExecutionInfo]))(_.isExecutionActive)
   }
 
   def getSessionCount: Long = {
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 7b2da6970fb..5ccc72c7782 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -101,7 +101,8 @@ private[thriftserver] class HiveThriftServer2Listener(
       // Execution end event (Refer SPARK-27019). To handle that situation, if 
occurs in
       // Thriftserver, following code will take care. Here will come only if 
JobStart event comes
       // after Execution End event.
-      val storeExecInfo = 
kvstore.view(classOf[ExecutionInfo]).asScala.filter(_.groupId == groupId)
+      val storeExecInfo = KVUtils.viewToSeq(
+        kvstore.view(classOf[ExecutionInfo]), Int.MaxValue)(_.groupId == 
groupId)
       storeExecInfo.foreach { exec =>
         val liveExec = getOrCreateExecution(exec.execId, exec.statement, 
exec.sessionId,
           exec.startTimestamp, exec.userName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to