This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3e26876  [SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a 
new index
3e26876 is described below

commit 3e268763c345076801f1ff6f75bab67ecab87e8f
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Mon Mar 2 15:48:48 2020 +0800

    [SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a new index
    
    ### What changes were proposed in this pull request?
    
    Spark uses the class `InMemoryStore` as the KV storage for live UI and 
history server(by default if no LevelDB file path is provided).
    In `InMemoryStore`, all the task data in one application is stored in a 
hashmap, which key is the task ID and the value is the task data. This fine for 
getting or deleting with a provided task ID.
    However, Spark stage UI always shows all the task data in one stage and the 
current implementation is to look up all the values in the hashmap. The time 
complexity is O(numOfTasks).
    Also, when there are too many stages (>spark.ui.retainedStages), Spark will 
linearly try to look up all the task data of the stages to be deleted as well.
    
    This can be very bad for a large application with many stages and tasks. We 
can improve it by allowing the natural key of an entity to have a real parent 
index. So that on each lookup with parent node provided, Spark can look up all 
the natural keys(in our case, the task IDs) first, and then find the data with 
the natural keys in the hashmap.
    
    ### Why are the changes needed?
    
    The in-memory KV store becomes really slow for large applications. We can 
improve it with a new index. The performance can be 10 times, 100 times, even 
1000 times faster.
    This is also possible to make the Spark driver more stable for large 
applications.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests.
    Also, I run a benchmark with the following code
    ```
      val store = new InMemoryStore()
      val numberOfTasksPerStage = 10000
       (0 until 1000).map { sId =>
         (0 until numberOfTasksPerStage).map { taskId =>
           val task = newTaskData(sId * numberOfTasksPerStage + taskId, 
"SUCCESS", sId)
           store.write(task)
         }
       }
      val appStatusStore = new AppStatusStore(store)
      var start = System.nanoTime()
      appStatusStore.taskSummary(2, attemptId, Array(0, 0.25, 0.5, 0.75, 1))
      println("task summary run time: " + ((System.nanoTime() - start) / 
1000000))
      val stageIds = Seq(1, 11, 66, 88)
      val stageKeys = stageIds.map(Array(_, attemptId))
      start = System.nanoTime()
      store.removeAllByIndexValues(classOf[TaskDataWrapper], 
TaskIndexNames.STAGE,
        stageKeys.asJavaCollection)
       println("clean up tasks run time: " + ((System.nanoTime() - start) / 
1000000))
    ```
    
    Task summary before the changes: 98642ms
    Task summary after the changes: 120ms
    
    Task clean up before the changes:  4900ms
    Task clean up before the changes: 4ms
    
    It's 800x faster after the changes in the micro-benchmark.
    
    Closes #27716 from gengliangwang/liveUIStore.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 6b641430c37e0115bee781fed7360187b988313d)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/util/kvstore/InMemoryStore.java   | 116 +++++++++++++++++----
 .../org/apache/spark/util/kvstore/KVTypeInfo.java  |   7 +-
 .../apache/spark/util/kvstore/LevelDBTypeInfo.java |   5 +-
 .../scala/org/apache/spark/status/storeTypes.scala |   2 +-
 4 files changed, 105 insertions(+), 25 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
index b33c538..db08740 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
@@ -163,6 +163,12 @@ public class InMemoryStore implements KVStore {
     }
   }
 
+  /**
+   * An alias class for the type "ConcurrentHashMap<Comparable<Object>, 
Boolean>", which is used
+   * as a concurrent hashset for storing natural keys and the boolean value 
doesn't matter.
+   */
+  private static class NaturalKeys extends 
ConcurrentHashMap<Comparable<Object>, Boolean> {}
+
   private static class InstanceList<T> {
 
     /**
@@ -205,11 +211,19 @@ public class InMemoryStore implements KVStore {
     private final KVTypeInfo ti;
     private final KVTypeInfo.Accessor naturalKey;
     private final ConcurrentMap<Comparable<Object>, T> data;
+    private final String naturalParentIndexName;
+    private final Boolean hasNaturalParentIndex;
+    // A mapping from parent to the natural keys of its children.
+    // For example, a mapping from a stage ID to all the task IDs in the stage.
+    private final ConcurrentMap<Comparable<Object>, NaturalKeys> 
parentToChildrenMap;
 
     private InstanceList(Class<?> klass) {
       this.ti = new KVTypeInfo(klass);
       this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
       this.data = new ConcurrentHashMap<>();
+      this.naturalParentIndexName = 
ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
+      this.parentToChildrenMap = new ConcurrentHashMap<>();
+      this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
     }
 
     KVTypeInfo.Accessor getIndexAccessor(String indexName) {
@@ -217,11 +231,30 @@ public class InMemoryStore implements KVStore {
     }
 
     int countingRemoveAllByIndexValues(String index, Collection<?> 
indexValues) {
-      Predicate<? super T> filter = getPredicate(ti.getAccessor(index), 
indexValues);
-      CountingRemoveIfForEach<T> callback = new 
CountingRemoveIfForEach<>(data, filter);
+      if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) {
+        // If there is a parent index for the natural index and `index` 
happens to be it,
+        // Spark can use the `parentToChildrenMap` to get the related natural 
keys, and then
+        // delete them from `data`.
+        int count = 0;
+        for (Object indexValue : indexValues) {
+          Comparable<Object> parentKey = asKey(indexValue);
+          NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, 
new NaturalKeys());
+          for (Comparable<Object> naturalKey : children.keySet()) {
+            data.remove(naturalKey);
+            count ++;
+          }
+          parentToChildrenMap.remove(parentKey);
+        }
+        return count;
+      } else {
+        Predicate<? super T> filter = getPredicate(ti.getAccessor(index), 
indexValues);
+        CountingRemoveIfForEach<T> callback = new 
CountingRemoveIfForEach<>(data, filter);
 
-      data.forEach(callback);
-      return callback.count();
+        // Go through all the values in `data` and delete objects that meets 
the predicate `filter`.
+        // This can be slow when there is a large number of entries in `data`.
+        data.forEach(callback);
+        return callback.count();
+      }
     }
 
     public T get(Object key) {
@@ -230,10 +263,27 @@ public class InMemoryStore implements KVStore {
 
     public void put(T value) throws Exception {
       data.put(asKey(naturalKey.get(value)), value);
+      if (hasNaturalParentIndex) {
+        Comparable<Object> parentKey = 
asKey(getIndexAccessor(naturalParentIndexName).get(value));
+        NaturalKeys children =
+          parentToChildrenMap.computeIfAbsent(parentKey, k -> new 
NaturalKeys());
+        children.put(asKey(naturalKey.get(value)), true);
+      }
     }
 
     public void delete(Object key) {
       data.remove(asKey(key));
+      if (hasNaturalParentIndex) {
+        for (NaturalKeys v : parentToChildrenMap.values()) {
+          if (v.remove(asKey(key))) {
+            // `v` can be empty after removing the natural key and we can 
remove it from
+            // `parentToChildrenMap`. However, `parentToChildrenMap` is a 
ConcurrentMap and such
+            // checking and deleting can be slow.
+            // This method is to delete one object with certain key, let's 
make it simple here.
+            break;
+          }
+        }
+      }
     }
 
     public int size() {
@@ -241,7 +291,7 @@ public class InMemoryStore implements KVStore {
     }
 
     public InMemoryView<T> view() {
-      return new InMemoryView<>(data.values(), ti);
+      return new InMemoryView<>(data, ti, naturalParentIndexName, 
parentToChildrenMap);
     }
 
     private static <T> Predicate<? super T> getPredicate(
@@ -271,22 +321,32 @@ public class InMemoryStore implements KVStore {
 
   private static class InMemoryView<T> extends KVStoreView<T> {
     private static final InMemoryView<?> EMPTY_VIEW =
-      new InMemoryView<>(Collections.emptyList(), null);
+      new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new 
ConcurrentHashMap<>());
 
-    private final Collection<T> elements;
+    private final ConcurrentMap<Comparable<Object>, T> data;
     private final KVTypeInfo ti;
     private final KVTypeInfo.Accessor natural;
-
-    InMemoryView(Collection<T> elements, KVTypeInfo ti) {
-      this.elements = elements;
+    private final ConcurrentMap<Comparable<Object>, NaturalKeys> 
parentToChildrenMap;
+    private final String naturalParentIndexName;
+    private final Boolean hasNaturalParentIndex;
+
+    InMemoryView(
+        ConcurrentMap<Comparable<Object>, T> data,
+        KVTypeInfo ti,
+        String naturalParentIndexName,
+        ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) {
+      this.data = data;
       this.ti = ti;
       this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : 
null;
+      this.naturalParentIndexName = naturalParentIndexName;
+      this.parentToChildrenMap = parentToChildrenMap;
+      this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
     }
 
     @Override
     public Iterator<T> iterator() {
-      if (elements.isEmpty()) {
-        return new InMemoryIterator<>(elements.iterator());
+      if (data.isEmpty()) {
+        return new InMemoryIterator<>(Collections.emptyIterator());
       }
 
       KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : 
null;
@@ -322,15 +382,31 @@ public class InMemoryStore implements KVStore {
      */
     private List<T> copyElements() {
       if (parent != null) {
-        KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
-        Preconditions.checkArgument(parentGetter != null, "Parent filter for 
non-child index.");
-        Comparable<?> parentKey = asKey(parent);
-
-        return elements.stream()
-          .filter(e -> compare(e, parentGetter, parentKey) == 0)
-          .collect(Collectors.toList());
+        Comparable<Object> parentKey = asKey(parent);
+        if (hasNaturalParentIndex && 
naturalParentIndexName.equals(ti.getParentIndexName(index))) {
+          // If there is a parent index for the natural index and the parent 
of `index` happens to
+          // be it, Spark can use the `parentToChildrenMap` to get the related 
natural keys, and
+          // then copy them from `data`.
+          NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, 
new NaturalKeys());
+          ArrayList<T> elements = new ArrayList<>();
+          for (Comparable<Object> naturalKey : children.keySet()) {
+            data.computeIfPresent(naturalKey, (k, v) -> {
+              elements.add(v);
+              return v;
+            });
+          }
+          return elements;
+        } else {
+          // Go through all the values in `data` and collect all the objects 
has certain parent
+          // value. This can be slow when there is a large number of entries 
in `data`.
+          KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
+          Preconditions.checkArgument(parentGetter != null, "Parent filter for 
non-child index.");
+          return data.values().stream()
+            .filter(e -> compare(e, parentGetter, parentKey) == 0)
+            .collect(Collectors.toList());
+        }
       } else {
-        return new ArrayList<>(elements);
+        return new ArrayList<>(data.values());
       }
     }
 
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
index d2a2698..5404d33 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
@@ -68,8 +68,6 @@ public class KVTypeInfo {
 
     
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
         "No natural index defined for type %s.", type.getName());
-    
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
-        "Natural index of %s cannot have a parent.", type.getName());
 
     for (KVIndex idx : indices.values()) {
       if (!idx.parent().isEmpty()) {
@@ -117,6 +115,11 @@ public class KVTypeInfo {
     return index.parent().isEmpty() ? null : getAccessor(index.parent());
   }
 
+  String getParentIndexName(String indexName) {
+    KVIndex index = indices.get(indexName);
+    return index.parent();
+  }
+
   /**
    * Abstracts the difference between invoking a Field and a Method.
    */
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
index f4d3592..d742353 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
@@ -133,12 +133,13 @@ class LevelDBTypeInfo {
 
     // First create the parent indices, then the child indices.
     ti.indices().forEach(idx -> {
-      if (idx.parent().isEmpty()) {
+      // In LevelDB, there is no parent index for the NUTURAL INDEX.
+      if (idx.parent().isEmpty() || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
         indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), 
null));
       }
     });
     ti.indices().forEach(idx -> {
-      if (!idx.parent().isEmpty()) {
+      if (!idx.parent().isEmpty() && 
!idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
         indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
           indices.get(idx.parent())));
       }
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala 
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index f0a94d8..c957ff7 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -154,7 +154,7 @@ private[spark] object TaskIndexNames {
 private[spark] class TaskDataWrapper(
     // Storing this as an object actually saves memory; it's also used as the 
key in the in-memory
     // store, so in that case you'd save the extra copy of the value here.
-    @KVIndexParam
+    @KVIndexParam(parent = TaskIndexNames.STAGE)
     val taskId: JLong,
     @KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = 
TaskIndexNames.STAGE)
     val index: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to