[SPARK-21671][CORE] Move kvstore to "util" sub-package, add private annotation.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #18886 from vanzin/SPARK-21671.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c1bfb49
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c1bfb49
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c1bfb49

Branch: refs/heads/master
Commit: 2c1bfb497f31ff402796b57b617a9075c6044e4d
Parents: 979bf94
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Aug 8 14:33:27 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Aug 8 14:33:27 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/kvstore/ArrayWrappers.java | 213 --------
 .../org/apache/spark/kvstore/InMemoryStore.java | 320 ------------
 .../java/org/apache/spark/kvstore/KVIndex.java  |  82 ---
 .../java/org/apache/spark/kvstore/KVStore.java  | 126 -----
 .../apache/spark/kvstore/KVStoreIterator.java   |  47 --
 .../apache/spark/kvstore/KVStoreSerializer.java |  86 ----
 .../org/apache/spark/kvstore/KVStoreView.java   | 123 -----
 .../org/apache/spark/kvstore/KVTypeInfo.java    | 154 ------
 .../java/org/apache/spark/kvstore/LevelDB.java  | 307 -----------
 .../apache/spark/kvstore/LevelDBIterator.java   | 277 ----------
 .../apache/spark/kvstore/LevelDBTypeInfo.java   | 511 -------------------
 .../UnsupportedStoreVersionException.java       |  27 -
 .../spark/util/kvstore/ArrayWrappers.java       | 213 ++++++++
 .../spark/util/kvstore/InMemoryStore.java       | 320 ++++++++++++
 .../org/apache/spark/util/kvstore/KVIndex.java  |  85 +++
 .../org/apache/spark/util/kvstore/KVStore.java  | 129 +++++
 .../spark/util/kvstore/KVStoreIterator.java     |  50 ++
 .../spark/util/kvstore/KVStoreSerializer.java   |  89 ++++
 .../apache/spark/util/kvstore/KVStoreView.java  | 126 +++++
 .../apache/spark/util/kvstore/KVTypeInfo.java   | 157 ++++++
 .../org/apache/spark/util/kvstore/LevelDB.java  | 310 +++++++++++
 .../spark/util/kvstore/LevelDBIterator.java     | 277 ++++++++++
 .../spark/util/kvstore/LevelDBTypeInfo.java     | 511 +++++++++++++++++++
 .../UnsupportedStoreVersionException.java       |  30 ++
 .../apache/spark/kvstore/ArrayKeyIndexType.java |  44 --
 .../spark/kvstore/ArrayWrappersSuite.java       |  59 ---
 .../org/apache/spark/kvstore/CustomType1.java   |  63 ---
 .../apache/spark/kvstore/DBIteratorSuite.java   | 504 ------------------
 .../spark/kvstore/InMemoryIteratorSuite.java    |  27 -
 .../spark/kvstore/InMemoryStoreSuite.java       | 161 ------
 .../apache/spark/kvstore/LevelDBBenchmark.java  | 280 ----------
 .../spark/kvstore/LevelDBIteratorSuite.java     |  48 --
 .../org/apache/spark/kvstore/LevelDBSuite.java  | 286 -----------
 .../spark/kvstore/LevelDBTypeInfoSuite.java     | 207 --------
 .../spark/util/kvstore/ArrayKeyIndexType.java   |  44 ++
 .../spark/util/kvstore/ArrayWrappersSuite.java  |  59 +++
 .../apache/spark/util/kvstore/CustomType1.java  |  63 +++
 .../spark/util/kvstore/DBIteratorSuite.java     | 504 ++++++++++++++++++
 .../util/kvstore/InMemoryIteratorSuite.java     |  27 +
 .../spark/util/kvstore/InMemoryStoreSuite.java  | 161 ++++++
 .../spark/util/kvstore/LevelDBBenchmark.java    | 280 ++++++++++
 .../util/kvstore/LevelDBIteratorSuite.java      |  48 ++
 .../apache/spark/util/kvstore/LevelDBSuite.java | 286 +++++++++++
 .../util/kvstore/LevelDBTypeInfoSuite.java      | 207 ++++++++
 44 files changed, 3976 insertions(+), 3952 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
deleted file mode 100644
index 5efa842..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.util.Arrays;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A factory for array wrappers so that arrays can be used as keys in a map, 
sorted or not.
- *
- * The comparator implementation makes two assumptions:
- * - All elements are instances of Comparable
- * - When comparing two arrays, they both contain elements of the same type in 
corresponding
- *   indices.
- *
- * Otherwise, ClassCastExceptions may occur. The equality method can compare 
any two arrays.
- *
- * This class is not efficient and is mostly meant to compare really small 
arrays, like those
- * generally used as indices and keys in a KVStore.
- */
-class ArrayWrappers {
-
-  @SuppressWarnings("unchecked")
-  public static Comparable<Object> forArray(Object a) {
-    Preconditions.checkArgument(a.getClass().isArray());
-    Comparable<?> ret;
-    if (a instanceof int[]) {
-      ret = new ComparableIntArray((int[]) a);
-    } else if (a instanceof long[]) {
-      ret = new ComparableLongArray((long[]) a);
-    } else if (a instanceof byte[]) {
-      ret = new ComparableByteArray((byte[]) a);
-    } else {
-      
Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive());
-      ret = new ComparableObjectArray((Object[]) a);
-    }
-    return (Comparable<Object>) ret;
-  }
-
-  private static class ComparableIntArray implements 
Comparable<ComparableIntArray> {
-
-    private final int[] array;
-
-    ComparableIntArray(int[] array) {
-      this.array = array;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof ComparableIntArray)) {
-        return false;
-      }
-      return Arrays.equals(array, ((ComparableIntArray) other).array);
-    }
-
-    @Override
-    public int hashCode() {
-      int code = 0;
-      for (int i = 0; i < array.length; i++) {
-        code = (code * 31) + array[i];
-      }
-      return code;
-    }
-
-    @Override
-    public int compareTo(ComparableIntArray other) {
-      int len = Math.min(array.length, other.array.length);
-      for (int i = 0; i < len; i++) {
-        int diff = array[i] - other.array[i];
-        if (diff != 0) {
-          return diff;
-        }
-      }
-
-      return array.length - other.array.length;
-    }
-  }
-
-  private static class ComparableLongArray implements 
Comparable<ComparableLongArray> {
-
-    private final long[] array;
-
-    ComparableLongArray(long[] array) {
-      this.array = array;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof ComparableLongArray)) {
-        return false;
-      }
-      return Arrays.equals(array, ((ComparableLongArray) other).array);
-    }
-
-    @Override
-    public int hashCode() {
-      int code = 0;
-      for (int i = 0; i < array.length; i++) {
-        code = (code * 31) + (int) array[i];
-      }
-      return code;
-    }
-
-    @Override
-    public int compareTo(ComparableLongArray other) {
-      int len = Math.min(array.length, other.array.length);
-      for (int i = 0; i < len; i++) {
-        long diff = array[i] - other.array[i];
-        if (diff != 0) {
-          return diff > 0 ? 1 : -1;
-        }
-      }
-
-      return array.length - other.array.length;
-    }
-  }
-
-  private static class ComparableByteArray implements 
Comparable<ComparableByteArray> {
-
-    private final byte[] array;
-
-    ComparableByteArray(byte[] array) {
-      this.array = array;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof ComparableByteArray)) {
-        return false;
-      }
-      return Arrays.equals(array, ((ComparableByteArray) other).array);
-    }
-
-    @Override
-    public int hashCode() {
-      int code = 0;
-      for (int i = 0; i < array.length; i++) {
-        code = (code * 31) + array[i];
-      }
-      return code;
-    }
-
-    @Override
-    public int compareTo(ComparableByteArray other) {
-      int len = Math.min(array.length, other.array.length);
-      for (int i = 0; i < len; i++) {
-        int diff = array[i] - other.array[i];
-        if (diff != 0) {
-          return diff;
-        }
-      }
-
-      return array.length - other.array.length;
-    }
-  }
-
-  private static class ComparableObjectArray implements 
Comparable<ComparableObjectArray> {
-
-    private final Object[] array;
-
-    ComparableObjectArray(Object[] array) {
-      this.array = array;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof ComparableObjectArray)) {
-        return false;
-      }
-      return Arrays.equals(array, ((ComparableObjectArray) other).array);
-    }
-
-    @Override
-    public int hashCode() {
-      int code = 0;
-      for (int i = 0; i < array.length; i++) {
-        code = (code * 31) + array[i].hashCode();
-      }
-      return code;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public int compareTo(ComparableObjectArray other) {
-      int len = Math.min(array.length, other.array.length);
-      for (int i = 0; i < len; i++) {
-        int diff = ((Comparable<Object>) 
array[i]).compareTo((Comparable<Object>) other.array[i]);
-        if (diff != 0) {
-          return diff;
-        }
-      }
-
-      return array.length - other.array.length;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
deleted file mode 100644
index f3aeb82..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import org.apache.spark.annotation.Private;
-
-/**
- * Implementation of KVStore that keeps data deserialized in memory. This 
store does not index
- * data; instead, whenever iterating over an indexed field, the stored data is 
copied and sorted
- * according to the index. This saves memory but makes iteration more 
expensive.
- */
-@Private
-public class InMemoryStore implements KVStore {
-
-  private Object metadata;
-  private ConcurrentMap<Class<?>, InstanceList> data = new 
ConcurrentHashMap<>();
-
-  @Override
-  public <T> T getMetadata(Class<T> klass) {
-    return klass.cast(metadata);
-  }
-
-  @Override
-  public void setMetadata(Object value) {
-    this.metadata = value;
-  }
-
-  @Override
-  public long count(Class<?> type) {
-    InstanceList list = data.get(type);
-    return list != null ? list.size() : 0;
-  }
-
-  @Override
-  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
-    InstanceList list = data.get(type);
-    int count = 0;
-    Object comparable = asKey(indexedValue);
-    KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
-    for (Object o : view(type)) {
-      if (Objects.equal(comparable, asKey(accessor.get(o)))) {
-        count++;
-      }
-    }
-    return count;
-  }
-
-  @Override
-  public <T> T read(Class<T> klass, Object naturalKey) {
-    InstanceList list = data.get(klass);
-    Object value = list != null ? list.get(naturalKey) : null;
-    if (value == null) {
-      throw new NoSuchElementException();
-    }
-    return klass.cast(value);
-  }
-
-  @Override
-  public void write(Object value) throws Exception {
-    InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
-      try {
-        return new InstanceList(key);
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    });
-    list.put(value);
-  }
-
-  @Override
-  public void delete(Class<?> type, Object naturalKey) {
-    InstanceList list = data.get(type);
-    if (list != null) {
-      list.delete(naturalKey);
-    }
-  }
-
-  @Override
-  public <T> KVStoreView<T> view(Class<T> type){
-    InstanceList list = data.get(type);
-    return list != null ? list.view(type)
-      : new InMemoryView<>(type, Collections.<T>emptyList(), null);
-  }
-
-  @Override
-  public void close() {
-    metadata = null;
-    data.clear();
-  }
-
-  @SuppressWarnings("unchecked")
-  private static Comparable<Object> asKey(Object in) {
-    if (in.getClass().isArray()) {
-      in = ArrayWrappers.forArray(in);
-    }
-    return (Comparable<Object>) in;
-  }
-
-  private static class InstanceList {
-
-    private final KVTypeInfo ti;
-    private final KVTypeInfo.Accessor naturalKey;
-    private final ConcurrentMap<Comparable<Object>, Object> data;
-
-    private int size;
-
-    private InstanceList(Class<?> type) throws Exception {
-      this.ti = new KVTypeInfo(type);
-      this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
-      this.data = new ConcurrentHashMap<>();
-      this.size = 0;
-    }
-
-    KVTypeInfo.Accessor getIndexAccessor(String indexName) {
-      return ti.getAccessor(indexName);
-    }
-
-    public Object get(Object key) {
-      return data.get(asKey(key));
-    }
-
-    public void put(Object value) throws Exception {
-      Preconditions.checkArgument(ti.type().equals(value.getClass()),
-        "Unexpected type: %s", value.getClass());
-      if (data.put(asKey(naturalKey.get(value)), value) == null) {
-        size++;
-      }
-    }
-
-    public void delete(Object key) {
-      if (data.remove(asKey(key)) != null) {
-        size--;
-      }
-    }
-
-    public int size() {
-      return size;
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T> InMemoryView<T> view(Class<T> type) {
-      Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: 
%s", type);
-      Collection<T> all = (Collection<T>) data.values();
-      return new InMemoryView(type, all, ti);
-    }
-
-  }
-
-  private static class InMemoryView<T> extends KVStoreView<T> {
-
-    private final Collection<T> elements;
-    private final KVTypeInfo ti;
-    private final KVTypeInfo.Accessor natural;
-
-    InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
-      super(type);
-      this.elements = elements;
-      this.ti = ti;
-      this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : 
null;
-    }
-
-    @Override
-    public Iterator<T> iterator() {
-      if (elements.isEmpty()) {
-        return new InMemoryIterator<>(elements.iterator());
-      }
-
-      try {
-        KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : 
null;
-        int modifier = ascending ? 1 : -1;
-
-        final List<T> sorted = copyElements();
-        Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, 
getter));
-        Stream<T> stream = sorted.stream();
-
-        if (first != null) {
-          stream = stream.filter(e -> modifier * compare(e, getter, first) >= 
0);
-        }
-
-        if (last != null) {
-          stream = stream.filter(e -> modifier * compare(e, getter, last) <= 
0);
-        }
-
-        if (skip > 0) {
-          stream = stream.skip(skip);
-        }
-
-        if (max < sorted.size()) {
-          stream = stream.limit((int) max);
-        }
-
-        return new InMemoryIterator<>(stream.iterator());
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    }
-
-    /**
-     * Create a copy of the input elements, filtering the values for child 
indices if needed.
-     */
-    private List<T> copyElements() {
-      if (parent != null) {
-        KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
-        Preconditions.checkArgument(parentGetter != null, "Parent filter for 
non-child index.");
-
-        return elements.stream()
-          .filter(e -> compare(e, parentGetter, parent) == 0)
-          .collect(Collectors.toList());
-      } else {
-        return new ArrayList<>(elements);
-      }
-    }
-
-    private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
-      try {
-        int diff = compare(e1, getter, getter.get(e2));
-        if (diff == 0 && getter != natural) {
-          diff = compare(e1, natural, natural.get(e2));
-        }
-        return diff;
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    }
-
-    private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
-      try {
-        return asKey(getter.get(e1)).compareTo(asKey(v2));
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    }
-
-  }
-
-  private static class InMemoryIterator<T> implements KVStoreIterator<T> {
-
-    private final Iterator<T> iter;
-
-    InMemoryIterator(Iterator<T> iter) {
-      this.iter = iter;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return iter.hasNext();
-    }
-
-    @Override
-    public T next() {
-      return iter.next();
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<T> next(int max) {
-      List<T> list = new ArrayList<>(max);
-      while (hasNext() && list.size() < max) {
-        list.add(next());
-      }
-      return list;
-    }
-
-    @Override
-    public boolean skip(long n) {
-      long skipped = 0;
-      while (skipped < n) {
-        if (hasNext()) {
-          next();
-          skipped++;
-        } else {
-          return false;
-        }
-      }
-
-      return hasNext();
-    }
-
-    @Override
-    public void close() {
-      // no op.
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
deleted file mode 100644
index 0cffefe..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Tags a field to be indexed when storing an object.
- *
- * <p>
- * Types are required to have a natural index that uniquely identifies 
instances in the store.
- * The default value of the annotation identifies the natural index for the 
type.
- * </p>
- *
- * <p>
- * Indexes allow for more efficient sorting of data read from the store. By 
annotating a field or
- * "getter" method with this annotation, an index will be created that will 
provide sorting based on
- * the string value of that field.
- * </p>
- *
- * <p>
- * Note that creating indices means more space will be needed, and maintenance 
operations like
- * updating or deleting a value will become more expensive.
- * </p>
- *
- * <p>
- * Indices are restricted to String, integral types (byte, short, int, long, 
boolean), and arrays
- * of those values.
- * </p>
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.FIELD, ElementType.METHOD})
-public @interface KVIndex {
-
-  String NATURAL_INDEX_NAME = "__main__";
-
-  /**
-   * The name of the index to be created for the annotated entity. Must be 
unique within
-   * the class. Index names are not allowed to start with an underscore 
(that's reserved for
-   * internal use). The default value is the natural index name (which is 
always a copy index
-   * regardless of the annotation's values).
-   */
-  String value() default NATURAL_INDEX_NAME;
-
-  /**
-   * The name of the parent index of this index. By default there is no parent 
index, so the
-   * generated data can be retrieved without having to provide a parent value.
-   *
-   * <p>
-   * If a parent index is defined, iterating over the data using the index 
will require providing
-   * a single value for the parent index. This serves as a rudimentary way to 
provide relationships
-   * between entities in the store.
-   * </p>
-   */
-  String parent() default "";
-
-  /**
-   * Whether to copy the instance's data to the index, instead of just storing 
a pointer to the
-   * data. The default behavior is to just store a reference; that saves disk 
space but is slower
-   * to read, since there's a level of indirection.
-   */
-  boolean copy() default false;
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
deleted file mode 100644
index c7808ea..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.Closeable;
-
-/**
- * Abstraction for a local key/value store for storing app data.
- *
- * <p>
- * There are two main features provided by the implementations of this 
interface:
- * </p>
- *
- * <h3>Serialization</h3>
- *
- * <p>
- * If the underlying data store requires serialization, data will be 
serialized to and deserialized
- * using a {@link KVStoreSerializer}, which can be customized by the 
application. The serializer is
- * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization of
- * app-defined types.
- * </p>
- *
- * <p>
- * Data is also automatically compressed to save disk space.
- * </p>
- *
- * <h3>Automatic Key Management</h3>
- *
- * <p>
- * When using the built-in key management, the implementation will 
automatically create unique
- * keys for each type written to the store. Keys are based on the type name, 
and always start
- * with the "+" prefix character (so that it's easy to use both manual and 
automatic key
- * management APIs without conflicts).
- * </p>
- *
- * <p>
- * Another feature of automatic key management is indexing; by annotating 
fields or methods of
- * objects written to the store with {@link KVIndex}, indices are created to 
sort the data
- * by the values of those properties. This makes it possible to provide 
sorting without having
- * to load all instances of those types from the store.
- * </p>
- *
- * <p>
- * KVStore instances are thread-safe for both reads and writes.
- * </p>
- */
-public interface KVStore extends Closeable {
-
-  /**
-   * Returns app-specific metadata from the store, or null if it's not 
currently set.
-   *
-   * <p>
-   * The metadata type is application-specific. This is a convenience method 
so that applications
-   * don't need to define their own keys for this information.
-   * </p>
-   */
-  <T> T getMetadata(Class<T> klass) throws Exception;
-
-  /**
-   * Writes the given value in the store metadata key.
-   */
-  void setMetadata(Object value) throws Exception;
-
-  /**
-   * Read a specific instance of an object.
-   *
-   * @param naturalKey The object's "natural key", which uniquely identifies 
it. Null keys
-   *                   are not allowed.
-   * @throws java.util.NoSuchElementException If an element with the given key 
does not exist.
-   */
-  <T> T read(Class<T> klass, Object naturalKey) throws Exception;
-
-  /**
-   * Writes the given object to the store, including indexed fields. Indices 
are updated based
-   * on the annotated fields of the object's class.
-   *
-   * <p>
-   * Writes may be slower when the object already exists in the store, since 
it will involve
-   * updating existing indices.
-   * </p>
-   *
-   * @param value The object to write.
-   */
-  void write(Object value) throws Exception;
-
-  /**
-   * Removes an object and all data related to it, like index entries, from 
the store.
-   *
-   * @param type The object's type.
-   * @param naturalKey The object's "natural key", which uniquely identifies 
it. Null keys
-   *                   are not allowed.
-   * @throws java.util.NoSuchElementException If an element with the given key 
does not exist.
-   */
-  void delete(Class<?> type, Object naturalKey) throws Exception;
-
-  /**
-   * Returns a configurable view for iterating over entities of the given type.
-   */
-  <T> KVStoreView<T> view(Class<T> type) throws Exception;
-
-  /**
-   * Returns the number of items of the given type currently in the store.
-   */
-  long count(Class<?> type) throws Exception;
-
-  /**
-   * Returns the number of items of the given type which match the given 
indexed value.
-   */
-  long count(Class<?> type, String index, Object indexedValue) throws 
Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
deleted file mode 100644
index 3efdec9..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * An iterator for KVStore.
- *
- * <p>
- * Iterators may keep references to resources that need to be closed. It's 
recommended that users
- * explicitly close iterators after they're used.
- * </p>
- */
-public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable {
-
-  /**
-   * Retrieve multiple elements from the store.
-   *
-   * @param max Maximum number of elements to retrieve.
-   */
-  List<T> next(int max);
-
-  /**
-   * Skip in the iterator.
-   *
-   * @return Whether there are items left after skipping.
-   */
-  boolean skip(long n);
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
deleted file mode 100644
index b84ec91..0000000
--- 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Serializer used to translate between app-defined types and the LevelDB 
store.
- *
- * <p>
- * The serializer is based on Jackson, so values are written as JSON. It also 
allows "naked strings"
- * and integers to be written as values directly, which will be written as 
UTF-8 strings.
- * </p>
- */
-public class KVStoreSerializer {
-
-  /**
-   * Object mapper used to process app-specific types. If an application 
requires a specific
-   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
-   * to this object.
-   */
-  protected final ObjectMapper mapper;
-
-  public KVStoreSerializer() {
-    this.mapper = new ObjectMapper();
-  }
-
-  public final byte[] serialize(Object o) throws Exception {
-    if (o instanceof String) {
-      return ((String) o).getBytes(UTF_8);
-    } else {
-      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-      GZIPOutputStream out = new GZIPOutputStream(bytes);
-      try {
-        mapper.writeValue(out, o);
-      } finally {
-        out.close();
-      }
-      return bytes.toByteArray();
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception 
{
-    if (klass.equals(String.class)) {
-      return (T) new String(data, UTF_8);
-    } else {
-      GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
-      try {
-        return mapper.readValue(in, klass);
-      } finally {
-        in.close();
-      }
-    }
-  }
-
-  final byte[] serialize(long value) {
-    return String.valueOf(value).getBytes(UTF_8);
-  }
-
-  final long deserializeLong(byte[] data) {
-    return Long.parseLong(new String(data, UTF_8));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
deleted file mode 100644
index 8cd1f52..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A configurable view that allows iterating over values in a {@link KVStore}.
- *
- * <p>
- * The different methods can be used to configure the behavior of the 
iterator. Calling the same
- * method multiple times is allowed; the most recent value will be used.
- * </p>
- *
- * <p>
- * The iterators returned by this view are of type {@link KVStoreIterator}; 
they auto-close
- * when used in a for loop that exhausts their contents, but when used 
manually, they need
- * to be closed explicitly unless all elements are read.
- * </p>
- */
-public abstract class KVStoreView<T> implements Iterable<T> {
-
-  final Class<T> type;
-
-  boolean ascending = true;
-  String index = KVIndex.NATURAL_INDEX_NAME;
-  Object first = null;
-  Object last = null;
-  Object parent = null;
-  long skip = 0L;
-  long max = Long.MAX_VALUE;
-
-  public KVStoreView(Class<T> type) {
-    this.type = type;
-  }
-
-  /**
-   * Reverses the order of iteration. By default, iterates in ascending order.
-   */
-  public KVStoreView<T> reverse() {
-    ascending = !ascending;
-    return this;
-  }
-
-  /**
-   * Iterates according to the given index.
-   */
-  public KVStoreView<T> index(String name) {
-    this.index = Preconditions.checkNotNull(name);
-    return this;
-  }
-
-  /**
-   * Defines the value of the parent index when iterating over a child index. 
Only elements that
-   * match the parent index's value will be included in the iteration.
-   *
-   * <p>
-   * Required for iterating over child indices, will generate an error if 
iterating over a
-   * parent-less index.
-   * </p>
-   */
-  public KVStoreView<T> parent(Object value) {
-    this.parent = value;
-    return this;
-  }
-
-  /**
-   * Iterates starting at the given value of the chosen index (inclusive).
-   */
-  public KVStoreView<T> first(Object value) {
-    this.first = value;
-    return this;
-  }
-
-  /**
-   * Stops iteration at the given value of the chosen index (inclusive).
-   */
-  public KVStoreView<T> last(Object value) {
-    this.last = value;
-    return this;
-  }
-
-  /**
-   * Stops iteration after a number of elements has been retrieved.
-   */
-  public KVStoreView<T> max(long max) {
-    Preconditions.checkArgument(max > 0L, "max must be positive.");
-    this.max = max;
-    return this;
-  }
-
-  /**
-   * Skips a number of elements at the start of iteration. Skipped elements 
are not accounted
-   * when using {@link #max(long)}.
-   */
-  public KVStoreView<T> skip(long n) {
-    this.skip = n;
-    return this;
-  }
-
-  /**
-   * Returns an iterator for the current configuration.
-   */
-  public KVStoreIterator<T> closeableIterator() throws Exception {
-    return (KVStoreIterator<T>) iterator();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
deleted file mode 100644
index e3e61e0..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Wrapper around types managed in a KVStore, providing easy access to their 
indexed fields.
- */
-public class KVTypeInfo {
-
-  private final Class<?> type;
-  private final Map<String, KVIndex> indices;
-  private final Map<String, Accessor> accessors;
-
-  public KVTypeInfo(Class<?> type) throws Exception {
-    this.type = type;
-    this.accessors = new HashMap<>();
-    this.indices = new HashMap<>();
-
-    for (Field f : type.getDeclaredFields()) {
-      KVIndex idx = f.getAnnotation(KVIndex.class);
-      if (idx != null) {
-        checkIndex(idx, indices);
-        indices.put(idx.value(), idx);
-        f.setAccessible(true);
-        accessors.put(idx.value(), new FieldAccessor(f));
-      }
-    }
-
-    for (Method m : type.getDeclaredMethods()) {
-      KVIndex idx = m.getAnnotation(KVIndex.class);
-      if (idx != null) {
-        checkIndex(idx, indices);
-        Preconditions.checkArgument(m.getParameterTypes().length == 0,
-          "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
-        indices.put(idx.value(), idx);
-        m.setAccessible(true);
-        accessors.put(idx.value(), new MethodAccessor(m));
-      }
-    }
-
-    
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
-        "No natural index defined for type %s.", type.getName());
-    
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
-        "Natural index of %s cannot have a parent.", type.getName());
-
-    for (KVIndex idx : indices.values()) {
-      if (!idx.parent().isEmpty()) {
-        KVIndex parent = indices.get(idx.parent());
-        Preconditions.checkArgument(parent != null,
-          "Cannot find parent %s of index %s.", idx.parent(), idx.value());
-        Preconditions.checkArgument(parent.parent().isEmpty(),
-          "Parent index %s of index %s cannot be itself a child index.", 
idx.parent(), idx.value());
-      }
-    }
-  }
-
-  private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) {
-    Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
-      "No name provided for index in type %s.", type.getName());
-    Preconditions.checkArgument(
-      !idx.value().startsWith("_") || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
-      "Index name %s (in type %s) is not allowed.", idx.value(), 
type.getName());
-    Preconditions.checkArgument(idx.parent().isEmpty() || 
!idx.parent().equals(idx.value()),
-      "Index %s cannot be parent of itself.", idx.value());
-    Preconditions.checkArgument(!indices.containsKey(idx.value()),
-      "Duplicate index %s for type %s.", idx.value(), type.getName());
-  }
-
-  public Class<?> type() {
-    return type;
-  }
-
-  public Object getIndexValue(String indexName, Object instance) throws 
Exception {
-    return getAccessor(indexName).get(instance);
-  }
-
-  public Stream<KVIndex> indices() {
-    return indices.values().stream();
-  }
-
-  Accessor getAccessor(String indexName) {
-    Accessor a = accessors.get(indexName);
-    Preconditions.checkArgument(a != null, "No index %s.", indexName);
-    return a;
-  }
-
-  Accessor getParentAccessor(String indexName) {
-    KVIndex index = indices.get(indexName);
-    return index.parent().isEmpty() ? null : getAccessor(index.parent());
-  }
-
-  /**
-   * Abstracts the difference between invoking a Field and a Method.
-   */
-  interface Accessor {
-
-    Object get(Object instance) throws Exception;
-
-  }
-
-  private class FieldAccessor implements Accessor {
-
-    private final Field field;
-
-    FieldAccessor(Field field) {
-      this.field = field;
-    }
-
-    @Override
-    public Object get(Object instance) throws Exception {
-      return field.get(instance);
-    }
-
-  }
-
-  private class MethodAccessor implements Accessor {
-
-    private final Method method;
-
-    MethodAccessor(Method method) {
-      this.method = method;
-    }
-
-    @Override
-    public Object get(Object instance) throws Exception {
-      return method.invoke(instance);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
deleted file mode 100644
index 2714135..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
-
-/**
- * Implementation of KVStore that uses LevelDB as the underlying data store.
- */
-public class LevelDB implements KVStore {
-
-  @VisibleForTesting
-  static final long STORE_VERSION = 1L;
-
-  @VisibleForTesting
-  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
-
-  /** DB key where app metadata is stored. */
-  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
-
-  /** DB key where type aliases are stored. */
-  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
-
-  final AtomicReference<DB> _db;
-  final KVStoreSerializer serializer;
-
-  /**
-   * Keep a mapping of class names to a shorter, unique ID managed by the 
store. This serves two
-   * purposes: make the keys stored on disk shorter, and spread out the keys, 
since class names
-   * will often have a long, redundant prefix (think "org.apache.spark.").
-   */
-  private final ConcurrentMap<String, byte[]> typeAliases;
-  private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
-
-  public LevelDB(File path) throws Exception {
-    this(path, new KVStoreSerializer());
-  }
-
-  public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
-    this.serializer = serializer;
-    this.types = new ConcurrentHashMap<>();
-
-    Options options = new Options();
-    options.createIfMissing(!path.exists());
-    this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
-
-    byte[] versionData = db().get(STORE_VERSION_KEY);
-    if (versionData != null) {
-      long version = serializer.deserializeLong(versionData);
-      if (version != STORE_VERSION) {
-        throw new UnsupportedStoreVersionException();
-      }
-    } else {
-      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
-    }
-
-    Map<String, byte[]> aliases;
-    try {
-      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
-    } catch (NoSuchElementException e) {
-      aliases = new HashMap<>();
-    }
-    typeAliases = new ConcurrentHashMap<>(aliases);
-  }
-
-  @Override
-  public <T> T getMetadata(Class<T> klass) throws Exception {
-    try {
-      return get(METADATA_KEY, klass);
-    } catch (NoSuchElementException nsee) {
-      return null;
-    }
-  }
-
-  @Override
-  public void setMetadata(Object value) throws Exception {
-    if (value != null) {
-      put(METADATA_KEY, value);
-    } else {
-      db().delete(METADATA_KEY);
-    }
-  }
-
-  <T> T get(byte[] key, Class<T> klass) throws Exception {
-    byte[] data = db().get(key);
-    if (data == null) {
-      throw new NoSuchElementException(new String(key, UTF_8));
-    }
-    return serializer.deserialize(data, klass);
-  }
-
-  private void put(byte[] key, Object value) throws Exception {
-    Preconditions.checkArgument(value != null, "Null values are not allowed.");
-    db().put(key, serializer.serialize(value));
-  }
-
-  @Override
-  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
-    Preconditions.checkArgument(naturalKey != null, "Null keys are not 
allowed.");
-    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
-    return get(key, klass);
-  }
-
-  @Override
-  public void write(Object value) throws Exception {
-    Preconditions.checkArgument(value != null, "Null values are not allowed.");
-    LevelDBTypeInfo ti = getTypeInfo(value.getClass());
-
-    try (WriteBatch batch = db().createWriteBatch()) {
-      byte[] data = serializer.serialize(value);
-      synchronized (ti) {
-        Object existing;
-        try {
-          existing = get(ti.naturalIndex().entityKey(null, value), 
value.getClass());
-        } catch (NoSuchElementException e) {
-          existing = null;
-        }
-
-        PrefixCache cache = new PrefixCache(value);
-        byte[] naturalKey = 
ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
-        for (LevelDBTypeInfo.Index idx : ti.indices()) {
-          byte[] prefix = cache.getPrefix(idx);
-          idx.add(batch, value, existing, data, naturalKey, prefix);
-        }
-        db().write(batch);
-      }
-    }
-  }
-
-  @Override
-  public void delete(Class<?> type, Object naturalKey) throws Exception {
-    Preconditions.checkArgument(naturalKey != null, "Null keys are not 
allowed.");
-    try (WriteBatch batch = db().createWriteBatch()) {
-      LevelDBTypeInfo ti = getTypeInfo(type);
-      byte[] key = ti.naturalIndex().start(null, naturalKey);
-      synchronized (ti) {
-        byte[] data = db().get(key);
-        if (data != null) {
-          Object existing = serializer.deserialize(data, type);
-          PrefixCache cache = new PrefixCache(existing);
-          byte[] keyBytes = 
ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
-          for (LevelDBTypeInfo.Index idx : ti.indices()) {
-            idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
-          }
-          db().write(batch);
-        }
-      }
-    } catch (NoSuchElementException nse) {
-      // Ignore.
-    }
-  }
-
-  @Override
-  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
-    return new KVStoreView<T>(type) {
-      @Override
-      public Iterator<T> iterator() {
-        try {
-          return new LevelDBIterator<>(LevelDB.this, this);
-        } catch (Exception e) {
-          throw Throwables.propagate(e);
-        }
-      }
-    };
-  }
-
-  @Override
-  public long count(Class<?> type) throws Exception {
-    LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
-    return idx.getCount(idx.end(null));
-  }
-
-  @Override
-  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
-    LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
-    return idx.getCount(idx.end(null, indexedValue));
-  }
-
-  @Override
-  public void close() throws IOException {
-    DB _db = this._db.getAndSet(null);
-    if (_db == null) {
-      return;
-    }
-
-    try {
-      _db.close();
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-  }
-
-  /** Returns metadata about indices for the given type. */
-  LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
-    LevelDBTypeInfo ti = types.get(type);
-    if (ti == null) {
-      LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, 
getTypeAlias(type));
-      ti = types.putIfAbsent(type, tmp);
-      if (ti == null) {
-        ti = tmp;
-      }
-    }
-    return ti;
-  }
-
-  /**
-   * Try to avoid use-after close since that has the tendency of crashing the 
JVM. This doesn't
-   * prevent methods that retrieved the instance from using it after close, 
but hopefully will
-   * catch most cases; otherwise, we'll need some kind of locking.
-   */
-  DB db() {
-    DB _db = this._db.get();
-    if (_db == null) {
-      throw new IllegalStateException("DB is closed.");
-    }
-    return _db;
-  }
-
-  private byte[] getTypeAlias(Class<?> klass) throws Exception {
-    byte[] alias = typeAliases.get(klass.getName());
-    if (alias == null) {
-      synchronized (typeAliases) {
-        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
-        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
-        if (alias == null) {
-          alias = tmp;
-          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
-        }
-      }
-    }
-    return alias;
-  }
-
-  /** Needs to be public for Jackson. */
-  public static class TypeAliases {
-
-    public Map<String, byte[]> aliases;
-
-    TypeAliases(Map<String, byte[]> aliases) {
-      this.aliases = aliases;
-    }
-
-    TypeAliases() {
-      this(null);
-    }
-
-  }
-
-  private static class PrefixCache {
-
-    private final Object entity;
-    private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
-
-    PrefixCache(Object entity) {
-      this.entity = entity;
-      this.prefixes = new HashMap<>();
-    }
-
-    byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
-      byte[] prefix = null;
-      if (idx.isChild()) {
-        prefix = prefixes.get(idx.parent());
-        if (prefix == null) {
-          prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
-          prefixes.put(idx.parent(), prefix);
-        }
-      }
-      return prefix;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
deleted file mode 100644
index 263d45c..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.iq80.leveldb.DBIterator;
-
-class LevelDBIterator<T> implements KVStoreIterator<T> {
-
-  private final LevelDB db;
-  private final boolean ascending;
-  private final DBIterator it;
-  private final Class<T> type;
-  private final LevelDBTypeInfo ti;
-  private final LevelDBTypeInfo.Index index;
-  private final byte[] indexKeyPrefix;
-  private final byte[] end;
-  private final long max;
-
-  private boolean checkedNext;
-  private byte[] next;
-  private boolean closed;
-  private long count;
-
-  LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception {
-    this.db = db;
-    this.ascending = params.ascending;
-    this.it = db.db().iterator();
-    this.type = params.type;
-    this.ti = db.getTypeInfo(type);
-    this.index = ti.index(params.index);
-    this.max = params.max;
-
-    Preconditions.checkArgument(!index.isChild() || params.parent != null,
-      "Cannot iterate over child index %s without parent value.", 
params.index);
-    byte[] parent = index.isChild() ? 
index.parent().childPrefix(params.parent) : null;
-
-    this.indexKeyPrefix = index.keyPrefix(parent);
-
-    byte[] firstKey;
-    if (params.first != null) {
-      if (ascending) {
-        firstKey = index.start(parent, params.first);
-      } else {
-        firstKey = index.end(parent, params.first);
-      }
-    } else if (ascending) {
-      firstKey = index.keyPrefix(parent);
-    } else {
-      firstKey = index.end(parent);
-    }
-    it.seek(firstKey);
-
-    byte[] end = null;
-    if (ascending) {
-      if (params.last != null) {
-        end = index.end(parent, params.last);
-      } else {
-        end = index.end(parent);
-      }
-    } else {
-      if (params.last != null) {
-        end = index.start(parent, params.last);
-      }
-      if (it.hasNext()) {
-        // When descending, the caller may have set up the start of iteration 
at a non-existant
-        // entry that is guaranteed to be after the desired entry. For 
example, if you have a
-        // compound key (a, b) where b is a, integer, you may seek to the end 
of the elements that
-        // have the same "a" value by specifying Integer.MAX_VALUE for "b", 
and that value may not
-        // exist in the database. So need to check here whether the next value 
actually belongs to
-        // the set being returned by the iterator before advancing.
-        byte[] nextKey = it.peekNext().getKey();
-        if (compare(nextKey, indexKeyPrefix) <= 0) {
-          it.next();
-        }
-      }
-    }
-    this.end = end;
-
-    if (params.skip > 0) {
-      skip(params.skip);
-    }
-  }
-
-  @Override
-  public boolean hasNext() {
-    if (!checkedNext && !closed) {
-      next = loadNext();
-      checkedNext = true;
-    }
-    if (!closed && next == null) {
-      try {
-        close();
-      } catch (IOException ioe) {
-        throw Throwables.propagate(ioe);
-      }
-    }
-    return next != null;
-  }
-
-  @Override
-  public T next() {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
-    }
-    checkedNext = false;
-
-    try {
-      T ret;
-      if (index == null || index.isCopy()) {
-        ret = db.serializer.deserialize(next, type);
-      } else {
-        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), 
next);
-        ret = db.get(key, type);
-      }
-      next = null;
-      return ret;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<T> next(int max) {
-    List<T> list = new ArrayList<>(max);
-    while (hasNext() && list.size() < max) {
-      list.add(next());
-    }
-    return list;
-  }
-
-  @Override
-  public boolean skip(long n) {
-    long skipped = 0;
-    while (skipped < n) {
-      if (next != null) {
-        checkedNext = false;
-        next = null;
-        skipped++;
-        continue;
-      }
-
-      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
-      if (!hasNext) {
-        checkedNext = true;
-        return false;
-      }
-
-      Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
-      if (!isEndMarker(e.getKey())) {
-        skipped++;
-      }
-    }
-
-    return hasNext();
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (!closed) {
-      it.close();
-      closed = true;
-    }
-  }
-
-  private byte[] loadNext() {
-    if (count >= max) {
-      return null;
-    }
-
-    try {
-      while (true) {
-        boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
-        if (!hasNext) {
-          return null;
-        }
-
-        Map.Entry<byte[], byte[]> nextEntry;
-        try {
-          // Avoid races if another thread is updating the DB.
-          nextEntry = ascending ? it.next() : it.prev();
-        } catch (NoSuchElementException e) {
-          return null;
-        }
-
-        byte[] nextKey = nextEntry.getKey();
-        // Next key is not part of the index, stop.
-        if (!startsWith(nextKey, indexKeyPrefix)) {
-          return null;
-        }
-
-        // If the next key is an end marker, then skip it.
-        if (isEndMarker(nextKey)) {
-          continue;
-        }
-
-        // If there's a known end key and iteration has gone past it, stop.
-        if (end != null) {
-          int comp = compare(nextKey, end) * (ascending ? 1 : -1);
-          if (comp > 0) {
-            return null;
-          }
-        }
-
-        count++;
-
-        // Next element is part of the iteration, return it.
-        return nextEntry.getValue();
-      }
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @VisibleForTesting
-  static boolean startsWith(byte[] key, byte[] prefix) {
-    if (key.length < prefix.length) {
-      return false;
-    }
-
-    for (int i = 0; i < prefix.length; i++) {
-      if (key[i] != prefix[i]) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private boolean isEndMarker(byte[] key) {
-    return (key.length > 2 &&
-        key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
-        key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
-  }
-
-  static int compare(byte[] a, byte[] b) {
-    int diff = 0;
-    int minLen = Math.min(a.length, b.length);
-    for (int i = 0; i < minLen; i++) {
-      diff += (a[i] - b[i]);
-      if (diff != 0) {
-        return diff;
-      }
-    }
-
-    return a.length - b.length;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
deleted file mode 100644
index 722f54e..0000000
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.lang.reflect.Array;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.base.Preconditions;
-import org.iq80.leveldb.WriteBatch;
-
-/**
- * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
- * via reflection, to make it cheaper to access it multiple times.
- *
- * <p>
- * The hierarchy of keys stored in LevelDB looks roughly like the following. 
This hierarchy ensures
- * that iteration over indices is easy, and that updating values in the store 
is not overly
- * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
- * lists of pointers, which would be more expensive to update at runtime.
- * </p>
- *
- * <p>
- * Indentation defines when a sub-key lives under a parent key. In LevelDB, 
this means the full
- * key would be the concatenation of everything up to that point in the 
hierarchy, with each
- * component separated by a NULL byte.
- * </p>
- *
- * <pre>
- * +TYPE_NAME
- *   NATURAL_INDEX
- *     +NATURAL_KEY
- *     -
- *   -NATURAL_INDEX
- *   INDEX_NAME
- *     +INDEX_VALUE
- *       +NATURAL_KEY
- *     -INDEX_VALUE
- *     .INDEX_VALUE
- *       CHILD_INDEX_NAME
- *         +CHILD_INDEX_VALUE
- *           NATURAL_KEY_OR_DATA
- *         -
- *   -INDEX_NAME
- * </pre>
- *
- * <p>
- * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
- * that end with "+<something>". A count of all objects that match a 
particular top-level index
- * value is kept at the end marker ("-<something>"). A count is also kept at 
the natural index's end
- * marker, to make it easy to retrieve the number of all elements of a 
particular type.
- * </p>
- *
- * <p>
- * To illustrate, given a type "Foo", with a natural index and a second index 
called "bar", you'd
- * have these keys and values in the store for two instances, one with natural 
key "key1" and the
- * other "key2", both with value "yes" for "bar":
- * </p>
- *
- * <pre>
- * Foo __main__ +key1   [data for instance 1]
- * Foo __main__ +key2   [data for instance 2]
- * Foo __main__ -       [count of all Foo]
- * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
- * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
- * Foo bar +yes -       [count of all Foo with "bar=yes" ]
- * </pre>
- *
- * <p>
- * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
- * explicit end marker. This allows for easily skipping to the end of an index 
by telling LevelDB
- * to seek to the "phantom" end marker of the index. Throughout the code and 
comments, this part
- * of the full LevelDB key is generally referred to as the "index value" of 
the entity.
- * </p>
- *
- * <p>
- * Child indices are stored after their parent index. In the example above, 
let's assume there is
- * a child index "child", whose parent is "bar". If both instances have value 
"no" for this field,
- * the data in the store would look something like the following:
- * </p>
- *
- * <pre>
- * ...
- * Foo bar +yes -
- * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index 
type]
- * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index 
type]
- * ...
- * </pre>
- */
-class LevelDBTypeInfo {
-
-  static final byte[] END_MARKER = new byte[] { '-' };
-  static final byte ENTRY_PREFIX = (byte) '+';
-  static final byte KEY_SEPARATOR = 0x0;
-  static byte TRUE = (byte) '1';
-  static byte FALSE = (byte) '0';
-
-  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
-  private static final byte POSITIVE_MARKER = (byte) '=';
-  private static final byte NEGATIVE_MARKER = (byte) '*';
-  private static final byte[] HEX_BYTES = new byte[] {
-    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 
'f'
-  };
-
-  private final LevelDB db;
-  private final Class<?> type;
-  private final Map<String, Index> indices;
-  private final byte[] typePrefix;
-
-  LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
-    this.db = db;
-    this.type = type;
-    this.indices = new HashMap<>();
-
-    KVTypeInfo ti = new KVTypeInfo(type);
-
-    // First create the parent indices, then the child indices.
-    ti.indices().forEach(idx -> {
-      if (idx.parent().isEmpty()) {
-        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), 
null));
-      }
-    });
-    ti.indices().forEach(idx -> {
-      if (!idx.parent().isEmpty()) {
-        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
-          indices.get(idx.parent())));
-      }
-    });
-
-    this.typePrefix = alias;
-  }
-
-  Class<?> type() {
-    return type;
-  }
-
-  byte[] keyPrefix() {
-    return typePrefix;
-  }
-
-  Index naturalIndex() {
-    return index(KVIndex.NATURAL_INDEX_NAME);
-  }
-
-  Index index(String name) {
-    Index i = indices.get(name);
-    Preconditions.checkArgument(i != null, "Index %s does not exist for type 
%s.", name,
-      type.getName());
-    return i;
-  }
-
-  Collection<Index> indices() {
-    return indices.values();
-  }
-
-  byte[] buildKey(byte[]... components) {
-    return buildKey(true, components);
-  }
-
-  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
-    int len = 0;
-    if (addTypePrefix) {
-      len += typePrefix.length + 1;
-    }
-    for (byte[] comp : components) {
-      len += comp.length;
-    }
-    len += components.length - 1;
-
-    byte[] dest = new byte[len];
-    int written = 0;
-
-    if (addTypePrefix) {
-      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
-      dest[typePrefix.length] = KEY_SEPARATOR;
-      written += typePrefix.length + 1;
-    }
-
-    for (byte[] comp : components) {
-      System.arraycopy(comp, 0, dest, written, comp.length);
-      written += comp.length;
-      if (written < dest.length) {
-        dest[written] = KEY_SEPARATOR;
-        written++;
-      }
-    }
-
-    return dest;
-  }
-
-  /**
-   * Models a single index in LevelDB. See top-level class's javadoc for a 
description of how the
-   * keys are generated.
-   */
-  class Index {
-
-    private final boolean copy;
-    private final boolean isNatural;
-    private final byte[] name;
-    private final KVTypeInfo.Accessor accessor;
-    private final Index parent;
-
-    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
-      byte[] name = self.value().getBytes(UTF_8);
-      if (parent != null) {
-        byte[] child = new byte[name.length + 1];
-        child[0] = SECONDARY_IDX_PREFIX;
-        System.arraycopy(name, 0, child, 1, name.length);
-      }
-
-      this.name = name;
-      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
-      this.copy = isNatural || self.copy();
-      this.accessor = accessor;
-      this.parent = parent;
-    }
-
-    boolean isCopy() {
-      return copy;
-    }
-
-    boolean isChild() {
-      return parent != null;
-    }
-
-    Index parent() {
-      return parent;
-    }
-
-    /**
-     * Creates a key prefix for child indices of this index. This allows the 
prefix to be
-     * calculated only once, avoiding redundant work when multiple child 
indices of the
-     * same parent index exist.
-     */
-    byte[] childPrefix(Object value) throws Exception {
-      Preconditions.checkState(parent == null, "Not a parent index.");
-      return buildKey(name, toParentKey(value));
-    }
-
-    /**
-     * Gets the index value for a particular entity (which is the value of the 
field or method
-     * tagged with the index annotation). This is used as part of the LevelDB 
key where the
-     * entity (or its id) is stored.
-     */
-    Object getValue(Object entity) throws Exception {
-      return accessor.get(entity);
-    }
-
-    private void checkParent(byte[] prefix) {
-      if (prefix != null) {
-        Preconditions.checkState(parent != null, "Parent prefix provided for 
parent index.");
-      } else {
-        Preconditions.checkState(parent == null, "Parent prefix missing for 
child index.");
-      }
-    }
-
-    /** The prefix for all keys that belong to this index. */
-    byte[] keyPrefix(byte[] prefix) {
-      checkParent(prefix);
-      return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
-    }
-
-    /**
-     * The key where to start ascending iteration for entities whose value for 
the indexed field
-     * match the given value.
-     */
-    byte[] start(byte[] prefix, Object value) {
-      checkParent(prefix);
-      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
-        : buildKey(name, toKey(value));
-    }
-
-    /** The key for the index's end marker. */
-    byte[] end(byte[] prefix) {
-      checkParent(prefix);
-      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
-        : buildKey(name, END_MARKER);
-    }
-
-    /** The key for the end marker for entries with the given value. */
-    byte[] end(byte[] prefix, Object value) throws Exception {
-      checkParent(prefix);
-      return (parent != null) ? buildKey(false, prefix, name, toKey(value), 
END_MARKER)
-        : buildKey(name, toKey(value), END_MARKER);
-    }
-
-    /** The full key in the index that identifies the given entity. */
-    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
-      Object indexValue = getValue(entity);
-      Preconditions.checkNotNull(indexValue, "Null index value for %s in type 
%s.",
-        name, type.getName());
-      byte[] entityKey = start(prefix, indexValue);
-      if (!isNatural) {
-        entityKey = buildKey(false, entityKey, 
toKey(naturalIndex().getValue(entity)));
-      }
-      return entityKey;
-    }
-
-    private void updateCount(WriteBatch batch, byte[] key, long delta) throws 
Exception {
-      long updated = getCount(key) + delta;
-      if (updated > 0) {
-        batch.put(key, db.serializer.serialize(updated));
-      } else {
-        batch.delete(key);
-      }
-    }
-
-    private void addOrRemove(
-        WriteBatch batch,
-        Object entity,
-        Object existing,
-        byte[] data,
-        byte[] naturalKey,
-        byte[] prefix) throws Exception {
-      Object indexValue = getValue(entity);
-      Preconditions.checkNotNull(indexValue, "Null index value for %s in type 
%s.",
-        name, type.getName());
-
-      byte[] entityKey = start(prefix, indexValue);
-      if (!isNatural) {
-        entityKey = buildKey(false, entityKey, naturalKey);
-      }
-
-      boolean needCountUpdate = (existing == null);
-
-      // Check whether there's a need to update the index. The index needs to 
be updated in two
-      // cases:
-      //
-      // - There is no existing value for the entity, so a new index value 
will be added.
-      // - If there is a previously stored value for the entity, and the index 
value for the
-      //   current index does not match the new value, the old entry needs to 
be deleted and
-      //   the new one added.
-      //
-      // Natural indices don't need to be checked, because by definition both 
old and new entities
-      // will have the same key. The put() call is all that's needed in that 
case.
-      //
-      // Also check whether we need to update the counts. If the indexed value 
is changing, we
-      // need to decrement the count at the old index value, and the new 
indexed value count needs
-      // to be incremented.
-      if (existing != null && !isNatural) {
-        byte[] oldPrefix = null;
-        Object oldIndexedValue = getValue(existing);
-        boolean removeExisting = !indexValue.equals(oldIndexedValue);
-        if (!removeExisting && isChild()) {
-          oldPrefix = parent().childPrefix(parent().getValue(existing));
-          removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
-        }
-
-        if (removeExisting) {
-          if (oldPrefix == null && isChild()) {
-            oldPrefix = parent().childPrefix(parent().getValue(existing));
-          }
-
-          byte[] oldKey = entityKey(oldPrefix, existing);
-          batch.delete(oldKey);
-
-          // If the indexed value has changed, we need to update the counts at 
the old and new
-          // end markers for the indexed value.
-          if (!isChild()) {
-            byte[] oldCountKey = end(null, oldIndexedValue);
-            updateCount(batch, oldCountKey, -1L);
-            needCountUpdate = true;
-          }
-        }
-      }
-
-      if (data != null) {
-        byte[] stored = copy ? data : naturalKey;
-        batch.put(entityKey, stored);
-      } else {
-        batch.delete(entityKey);
-      }
-
-      if (needCountUpdate && !isChild()) {
-        long delta = data != null ? 1L : -1L;
-        byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
-        updateCount(batch, countKey, delta);
-      }
-    }
-
-    /**
-     * Add an entry to the index.
-     *
-     * @param batch Write batch with other related changes.
-     * @param entity The entity being added to the index.
-     * @param existing The entity being replaced in the index, or null.
-     * @param data Serialized entity to store (when storing the entity, not a 
reference).
-     * @param naturalKey The value's natural key (to avoid re-computing it for 
every index).
-     * @param prefix The parent index prefix, if this is a child index.
-     */
-    void add(
-        WriteBatch batch,
-        Object entity,
-        Object existing,
-        byte[] data,
-        byte[] naturalKey,
-        byte[] prefix) throws Exception {
-      addOrRemove(batch, entity, existing, data, naturalKey, prefix);
-    }
-
-    /**
-     * Remove a value from the index.
-     *
-     * @param batch Write batch with other related changes.
-     * @param entity The entity being removed, to identify the index entry to 
modify.
-     * @param naturalKey The value's natural key (to avoid re-computing it for 
every index).
-     * @param prefix The parent index prefix, if this is a child index.
-     */
-    void remove(
-        WriteBatch batch,
-        Object entity,
-        byte[] naturalKey,
-        byte[] prefix) throws Exception {
-      addOrRemove(batch, entity, null, null, naturalKey, prefix);
-    }
-
-    long getCount(byte[] key) throws Exception {
-      byte[] data = db.db().get(key);
-      return data != null ? db.serializer.deserializeLong(data) : 0;
-    }
-
-    byte[] toParentKey(Object value) {
-      return toKey(value, SECONDARY_IDX_PREFIX);
-    }
-
-    byte[] toKey(Object value) {
-      return toKey(value, ENTRY_PREFIX);
-    }
-
-    /**
-     * Translates a value to be used as part of the store key.
-     *
-     * Integral numbers are encoded as a string in a way that preserves 
lexicographical
-     * ordering. The string is prepended with a marker telling whether the 
number is negative
-     * or positive ("*" for negative and "=" for positive are used since "-" 
and "+" have the
-     * opposite of the desired order), and then the number is encoded into a 
hex string (so
-     * it occupies twice the number of bytes as the original type).
-     *
-     * Arrays are encoded by encoding each element separately, separated by 
KEY_SEPARATOR.
-     */
-    byte[] toKey(Object value, byte prefix) {
-      final byte[] result;
-
-      if (value instanceof String) {
-        byte[] str = ((String) value).getBytes(UTF_8);
-        result = new byte[str.length + 1];
-        result[0] = prefix;
-        System.arraycopy(str, 0, result, 1, str.length);
-      } else if (value instanceof Boolean) {
-        result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
-      } else if (value.getClass().isArray()) {
-        int length = Array.getLength(value);
-        byte[][] components = new byte[length][];
-        for (int i = 0; i < length; i++) {
-          components[i] = toKey(Array.get(value, i));
-        }
-        result = buildKey(false, components);
-      } else {
-        int bytes;
-
-        if (value instanceof Integer) {
-          bytes = Integer.SIZE;
-        } else if (value instanceof Long) {
-          bytes = Long.SIZE;
-        } else if (value instanceof Short) {
-          bytes = Short.SIZE;
-        } else if (value instanceof Byte) {
-          bytes = Byte.SIZE;
-        } else {
-          throw new IllegalArgumentException(String.format("Type %s not 
allowed as key.",
-            value.getClass().getName()));
-        }
-
-        bytes = bytes / Byte.SIZE;
-
-        byte[] key = new byte[bytes * 2 + 2];
-        long longValue = ((Number) value).longValue();
-        key[0] = prefix;
-        key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
-
-        for (int i = 0; i < key.length - 2; i++) {
-          int masked = (int) ((longValue >>> (4 * i)) & 0xF);
-          key[key.length - i - 1] = HEX_BYTES[masked];
-        }
-
-        result = key;
-      }
-
-      return result;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
deleted file mode 100644
index 2ed246e..0000000
--- 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.IOException;
-
-/**
- * Exception thrown when the store implementation is not compatible with the 
underlying data.
- */
-public class UnsupportedStoreVersionException extends IOException {
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java
new file mode 100644
index 0000000..9bc8c55
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.util.Arrays;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A factory for array wrappers so that arrays can be used as keys in a map, 
sorted or not.
+ *
+ * The comparator implementation makes two assumptions:
+ * - All elements are instances of Comparable
+ * - When comparing two arrays, they both contain elements of the same type in 
corresponding
+ *   indices.
+ *
+ * Otherwise, ClassCastExceptions may occur. The equality method can compare 
any two arrays.
+ *
+ * This class is not efficient and is mostly meant to compare really small 
arrays, like those
+ * generally used as indices and keys in a KVStore.
+ */
+class ArrayWrappers {
+
+  @SuppressWarnings("unchecked")
+  public static Comparable<Object> forArray(Object a) {
+    Preconditions.checkArgument(a.getClass().isArray());
+    Comparable<?> ret;
+    if (a instanceof int[]) {
+      ret = new ComparableIntArray((int[]) a);
+    } else if (a instanceof long[]) {
+      ret = new ComparableLongArray((long[]) a);
+    } else if (a instanceof byte[]) {
+      ret = new ComparableByteArray((byte[]) a);
+    } else {
+      
Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive());
+      ret = new ComparableObjectArray((Object[]) a);
+    }
+    return (Comparable<Object>) ret;
+  }
+
+  private static class ComparableIntArray implements 
Comparable<ComparableIntArray> {
+
+    private final int[] array;
+
+    ComparableIntArray(int[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableIntArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableIntArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableIntArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableLongArray implements 
Comparable<ComparableLongArray> {
+
+    private final long[] array;
+
+    ComparableLongArray(long[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableLongArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableLongArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + (int) array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableLongArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        long diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff > 0 ? 1 : -1;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableByteArray implements 
Comparable<ComparableByteArray> {
+
+    private final byte[] array;
+
+    ComparableByteArray(byte[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableByteArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableByteArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableByteArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableObjectArray implements 
Comparable<ComparableObjectArray> {
+
+    private final Object[] array;
+
+    ComparableObjectArray(Object[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableObjectArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableObjectArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i].hashCode();
+      }
+      return code;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public int compareTo(ComparableObjectArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = ((Comparable<Object>) 
array[i]).compareTo((Comparable<Object>) other.array[i]);
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to