http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
new file mode 100644
index 0000000..9cae5da
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that keeps data deserialized in memory. This 
store does not index
+ * data; instead, whenever iterating over an indexed field, the stored data is 
copied and sorted
+ * according to the index. This saves memory but makes iteration more 
expensive.
+ */
+@Private
+public class InMemoryStore implements KVStore {
+
+  private Object metadata;
+  private ConcurrentMap<Class<?>, InstanceList> data = new 
ConcurrentHashMap<>();
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) {
+    return klass.cast(metadata);
+  }
+
+  @Override
+  public void setMetadata(Object value) {
+    this.metadata = value;
+  }
+
+  @Override
+  public long count(Class<?> type) {
+    InstanceList list = data.get(type);
+    return list != null ? list.size() : 0;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    InstanceList list = data.get(type);
+    int count = 0;
+    Object comparable = asKey(indexedValue);
+    KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
+    for (Object o : view(type)) {
+      if (Objects.equal(comparable, asKey(accessor.get(o)))) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) {
+    InstanceList list = data.get(klass);
+    Object value = list != null ? list.get(naturalKey) : null;
+    if (value == null) {
+      throw new NoSuchElementException();
+    }
+    return klass.cast(value);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
+      try {
+        return new InstanceList(key);
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    });
+    list.put(value);
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) {
+    InstanceList list = data.get(type);
+    if (list != null) {
+      list.delete(naturalKey);
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type){
+    InstanceList list = data.get(type);
+    return list != null ? list.view(type)
+      : new InMemoryView<>(type, Collections.<T>emptyList(), null);
+  }
+
+  @Override
+  public void close() {
+    metadata = null;
+    data.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Comparable<Object> asKey(Object in) {
+    if (in.getClass().isArray()) {
+      in = ArrayWrappers.forArray(in);
+    }
+    return (Comparable<Object>) in;
+  }
+
+  private static class InstanceList {
+
+    private final KVTypeInfo ti;
+    private final KVTypeInfo.Accessor naturalKey;
+    private final ConcurrentMap<Comparable<Object>, Object> data;
+
+    private int size;
+
+    private InstanceList(Class<?> type) throws Exception {
+      this.ti = new KVTypeInfo(type);
+      this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
+      this.data = new ConcurrentHashMap<>();
+      this.size = 0;
+    }
+
+    KVTypeInfo.Accessor getIndexAccessor(String indexName) {
+      return ti.getAccessor(indexName);
+    }
+
+    public Object get(Object key) {
+      return data.get(asKey(key));
+    }
+
+    public void put(Object value) throws Exception {
+      Preconditions.checkArgument(ti.type().equals(value.getClass()),
+        "Unexpected type: %s", value.getClass());
+      if (data.put(asKey(naturalKey.get(value)), value) == null) {
+        size++;
+      }
+    }
+
+    public void delete(Object key) {
+      if (data.remove(asKey(key)) != null) {
+        size--;
+      }
+    }
+
+    public int size() {
+      return size;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> InMemoryView<T> view(Class<T> type) {
+      Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: 
%s", type);
+      Collection<T> all = (Collection<T>) data.values();
+      return new InMemoryView(type, all, ti);
+    }
+
+  }
+
+  private static class InMemoryView<T> extends KVStoreView<T> {
+
+    private final Collection<T> elements;
+    private final KVTypeInfo ti;
+    private final KVTypeInfo.Accessor natural;
+
+    InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
+      super(type);
+      this.elements = elements;
+      this.ti = ti;
+      this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : 
null;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      if (elements.isEmpty()) {
+        return new InMemoryIterator<>(elements.iterator());
+      }
+
+      try {
+        KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : 
null;
+        int modifier = ascending ? 1 : -1;
+
+        final List<T> sorted = copyElements();
+        Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, 
getter));
+        Stream<T> stream = sorted.stream();
+
+        if (first != null) {
+          stream = stream.filter(e -> modifier * compare(e, getter, first) >= 
0);
+        }
+
+        if (last != null) {
+          stream = stream.filter(e -> modifier * compare(e, getter, last) <= 
0);
+        }
+
+        if (skip > 0) {
+          stream = stream.skip(skip);
+        }
+
+        if (max < sorted.size()) {
+          stream = stream.limit((int) max);
+        }
+
+        return new InMemoryIterator<>(stream.iterator());
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    /**
+     * Create a copy of the input elements, filtering the values for child 
indices if needed.
+     */
+    private List<T> copyElements() {
+      if (parent != null) {
+        KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
+        Preconditions.checkArgument(parentGetter != null, "Parent filter for 
non-child index.");
+
+        return elements.stream()
+          .filter(e -> compare(e, parentGetter, parent) == 0)
+          .collect(Collectors.toList());
+      } else {
+        return new ArrayList<>(elements);
+      }
+    }
+
+    private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
+      try {
+        int diff = compare(e1, getter, getter.get(e2));
+        if (diff == 0 && getter != natural) {
+          diff = compare(e1, natural, natural.get(e2));
+        }
+        return diff;
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
+      try {
+        return asKey(getter.get(e1)).compareTo(asKey(v2));
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+  }
+
+  private static class InMemoryIterator<T> implements KVStoreIterator<T> {
+
+    private final Iterator<T> iter;
+
+    InMemoryIterator(Iterator<T> iter) {
+      this.iter = iter;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public T next() {
+      return iter.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<T> next(int max) {
+      List<T> list = new ArrayList<>(max);
+      while (hasNext() && list.size() < max) {
+        list.add(next());
+      }
+      return list;
+    }
+
+    @Override
+    public boolean skip(long n) {
+      long skipped = 0;
+      while (skipped < n) {
+        if (hasNext()) {
+          next();
+          skipped++;
+        } else {
+          return false;
+        }
+      }
+
+      return hasNext();
+    }
+
+    @Override
+    public void close() {
+      // no op.
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java
new file mode 100644
index 0000000..80f4921
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Tags a field to be indexed when storing an object.
+ *
+ * <p>
+ * Types are required to have a natural index that uniquely identifies 
instances in the store.
+ * The default value of the annotation identifies the natural index for the 
type.
+ * </p>
+ *
+ * <p>
+ * Indexes allow for more efficient sorting of data read from the store. By 
annotating a field or
+ * "getter" method with this annotation, an index will be created that will 
provide sorting based on
+ * the string value of that field.
+ * </p>
+ *
+ * <p>
+ * Note that creating indices means more space will be needed, and maintenance 
operations like
+ * updating or deleting a value will become more expensive.
+ * </p>
+ *
+ * <p>
+ * Indices are restricted to String, integral types (byte, short, int, long, 
boolean), and arrays
+ * of those values.
+ * </p>
+ */
+@Private
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+public @interface KVIndex {
+
+  String NATURAL_INDEX_NAME = "__main__";
+
+  /**
+   * The name of the index to be created for the annotated entity. Must be 
unique within
+   * the class. Index names are not allowed to start with an underscore 
(that's reserved for
+   * internal use). The default value is the natural index name (which is 
always a copy index
+   * regardless of the annotation's values).
+   */
+  String value() default NATURAL_INDEX_NAME;
+
+  /**
+   * The name of the parent index of this index. By default there is no parent 
index, so the
+   * generated data can be retrieved without having to provide a parent value.
+   *
+   * <p>
+   * If a parent index is defined, iterating over the data using the index 
will require providing
+   * a single value for the parent index. This serves as a rudimentary way to 
provide relationships
+   * between entities in the store.
+   * </p>
+   */
+  String parent() default "";
+
+  /**
+   * Whether to copy the instance's data to the index, instead of just storing 
a pointer to the
+   * data. The default behavior is to just store a reference; that saves disk 
space but is slower
+   * to read, since there's a level of indirection.
+   */
+  boolean copy() default false;
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java
new file mode 100644
index 0000000..72d06a8
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.Closeable;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * <p>
+ * There are two main features provided by the implementations of this 
interface:
+ * </p>
+ *
+ * <h3>Serialization</h3>
+ *
+ * <p>
+ * If the underlying data store requires serialization, data will be 
serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the 
application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization of
+ * app-defined types.
+ * </p>
+ *
+ * <p>
+ * Data is also automatically compressed to save disk space.
+ * </p>
+ *
+ * <h3>Automatic Key Management</h3>
+ *
+ * <p>
+ * When using the built-in key management, the implementation will 
automatically create unique
+ * keys for each type written to the store. Keys are based on the type name, 
and always start
+ * with the "+" prefix character (so that it's easy to use both manual and 
automatic key
+ * management APIs without conflicts).
+ * </p>
+ *
+ * <p>
+ * Another feature of automatic key management is indexing; by annotating 
fields or methods of
+ * objects written to the store with {@link KVIndex}, indices are created to 
sort the data
+ * by the values of those properties. This makes it possible to provide 
sorting without having
+ * to load all instances of those types from the store.
+ * </p>
+ *
+ * <p>
+ * KVStore instances are thread-safe for both reads and writes.
+ * </p>
+ */
+@Private
+public interface KVStore extends Closeable {
+
+  /**
+   * Returns app-specific metadata from the store, or null if it's not 
currently set.
+   *
+   * <p>
+   * The metadata type is application-specific. This is a convenience method 
so that applications
+   * don't need to define their own keys for this information.
+   * </p>
+   */
+  <T> T getMetadata(Class<T> klass) throws Exception;
+
+  /**
+   * Writes the given value in the store metadata key.
+   */
+  void setMetadata(Object value) throws Exception;
+
+  /**
+   * Read a specific instance of an object.
+   *
+   * @param naturalKey The object's "natural key", which uniquely identifies 
it. Null keys
+   *                   are not allowed.
+   * @throws java.util.NoSuchElementException If an element with the given key 
does not exist.
+   */
+  <T> T read(Class<T> klass, Object naturalKey) throws Exception;
+
+  /**
+   * Writes the given object to the store, including indexed fields. Indices 
are updated based
+   * on the annotated fields of the object's class.
+   *
+   * <p>
+   * Writes may be slower when the object already exists in the store, since 
it will involve
+   * updating existing indices.
+   * </p>
+   *
+   * @param value The object to write.
+   */
+  void write(Object value) throws Exception;
+
+  /**
+   * Removes an object and all data related to it, like index entries, from 
the store.
+   *
+   * @param type The object's type.
+   * @param naturalKey The object's "natural key", which uniquely identifies 
it. Null keys
+   *                   are not allowed.
+   * @throws java.util.NoSuchElementException If an element with the given key 
does not exist.
+   */
+  void delete(Class<?> type, Object naturalKey) throws Exception;
+
+  /**
+   * Returns a configurable view for iterating over entities of the given type.
+   */
+  <T> KVStoreView<T> view(Class<T> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type currently in the store.
+   */
+  long count(Class<?> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type which match the given 
indexed value.
+   */
+  long count(Class<?> type, String index, Object indexedValue) throws 
Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java
new file mode 100644
index 0000000..28a432b
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * An iterator for KVStore.
+ *
+ * <p>
+ * Iterators may keep references to resources that need to be closed. It's 
recommended that users
+ * explicitly close iterators after they're used.
+ * </p>
+ */
+@Private
+public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable {
+
+  /**
+   * Retrieve multiple elements from the store.
+   *
+   * @param max Maximum number of elements to retrieve.
+   */
+  List<T> next(int max);
+
+  /**
+   * Skip in the iterator.
+   *
+   * @return Whether there are items left after skipping.
+   */
+  boolean skip(long n);
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
new file mode 100644
index 0000000..bd8d948
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB 
store.
+ *
+ * <p>
+ * The serializer is based on Jackson, so values are written as JSON. It also 
allows "naked strings"
+ * and integers to be written as values directly, which will be written as 
UTF-8 strings.
+ * </p>
+ */
+@Private
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application 
requires a specific
+   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+    this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+    if (o instanceof String) {
+      return ((String) o).getBytes(UTF_8);
+    } else {
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      GZIPOutputStream out = new GZIPOutputStream(bytes);
+      try {
+        mapper.writeValue(out, o);
+      } finally {
+        out.close();
+      }
+      return bytes.toByteArray();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception 
{
+    if (klass.equals(String.class)) {
+      return (T) new String(data, UTF_8);
+    } else {
+      GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
+      try {
+        return mapper.readValue(in, klass);
+      } finally {
+        in.close();
+      }
+    }
+  }
+
+  final byte[] serialize(long value) {
+    return String.valueOf(value).getBytes(UTF_8);
+  }
+
+  final long deserializeLong(byte[] data) {
+    return Long.parseLong(new String(data, UTF_8));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java
new file mode 100644
index 0000000..8ea79bb
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * A configurable view that allows iterating over values in a {@link KVStore}.
+ *
+ * <p>
+ * The different methods can be used to configure the behavior of the 
iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * </p>
+ *
+ * <p>
+ * The iterators returned by this view are of type {@link KVStoreIterator}; 
they auto-close
+ * when used in a for loop that exhausts their contents, but when used 
manually, they need
+ * to be closed explicitly unless all elements are read.
+ * </p>
+ */
+@Private
+public abstract class KVStoreView<T> implements Iterable<T> {
+
+  final Class<T> type;
+
+  boolean ascending = true;
+  String index = KVIndex.NATURAL_INDEX_NAME;
+  Object first = null;
+  Object last = null;
+  Object parent = null;
+  long skip = 0L;
+  long max = Long.MAX_VALUE;
+
+  public KVStoreView(Class<T> type) {
+    this.type = type;
+  }
+
+  /**
+   * Reverses the order of iteration. By default, iterates in ascending order.
+   */
+  public KVStoreView<T> reverse() {
+    ascending = !ascending;
+    return this;
+  }
+
+  /**
+   * Iterates according to the given index.
+   */
+  public KVStoreView<T> index(String name) {
+    this.index = Preconditions.checkNotNull(name);
+    return this;
+  }
+
+  /**
+   * Defines the value of the parent index when iterating over a child index. 
Only elements that
+   * match the parent index's value will be included in the iteration.
+   *
+   * <p>
+   * Required for iterating over child indices, will generate an error if 
iterating over a
+   * parent-less index.
+   * </p>
+   */
+  public KVStoreView<T> parent(Object value) {
+    this.parent = value;
+    return this;
+  }
+
+  /**
+   * Iterates starting at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> first(Object value) {
+    this.first = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> last(Object value) {
+    this.last = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration after a number of elements has been retrieved.
+   */
+  public KVStoreView<T> max(long max) {
+    Preconditions.checkArgument(max > 0L, "max must be positive.");
+    this.max = max;
+    return this;
+  }
+
+  /**
+   * Skips a number of elements at the start of iteration. Skipped elements 
are not accounted
+   * when using {@link #max(long)}.
+   */
+  public KVStoreView<T> skip(long n) {
+    this.skip = n;
+    return this;
+  }
+
+  /**
+   * Returns an iterator for the current configuration.
+   */
+  public KVStoreIterator<T> closeableIterator() throws Exception {
+    return (KVStoreIterator<T>) iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
new file mode 100644
index 0000000..a2b077e
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to their 
indexed fields.
+ */
+@Private
+public class KVTypeInfo {
+
+  private final Class<?> type;
+  private final Map<String, KVIndex> indices;
+  private final Map<String, Accessor> accessors;
+
+  public KVTypeInfo(Class<?> type) throws Exception {
+    this.type = type;
+    this.accessors = new HashMap<>();
+    this.indices = new HashMap<>();
+
+    for (Field f : type.getDeclaredFields()) {
+      KVIndex idx = f.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        indices.put(idx.value(), idx);
+        f.setAccessible(true);
+        accessors.put(idx.value(), new FieldAccessor(f));
+      }
+    }
+
+    for (Method m : type.getDeclaredMethods()) {
+      KVIndex idx = m.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        Preconditions.checkArgument(m.getParameterTypes().length == 0,
+          "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
+        indices.put(idx.value(), idx);
+        m.setAccessible(true);
+        accessors.put(idx.value(), new MethodAccessor(m));
+      }
+    }
+
+    
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+        "No natural index defined for type %s.", type.getName());
+    
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+        "Natural index of %s cannot have a parent.", type.getName());
+
+    for (KVIndex idx : indices.values()) {
+      if (!idx.parent().isEmpty()) {
+        KVIndex parent = indices.get(idx.parent());
+        Preconditions.checkArgument(parent != null,
+          "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+        Preconditions.checkArgument(parent.parent().isEmpty(),
+          "Parent index %s of index %s cannot be itself a child index.", 
idx.parent(), idx.value());
+      }
+    }
+  }
+
+  private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) {
+    Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
+      "No name provided for index in type %s.", type.getName());
+    Preconditions.checkArgument(
+      !idx.value().startsWith("_") || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+      "Index name %s (in type %s) is not allowed.", idx.value(), 
type.getName());
+    Preconditions.checkArgument(idx.parent().isEmpty() || 
!idx.parent().equals(idx.value()),
+      "Index %s cannot be parent of itself.", idx.value());
+    Preconditions.checkArgument(!indices.containsKey(idx.value()),
+      "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class<?> type() {
+    return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws 
Exception {
+    return getAccessor(indexName).get(instance);
+  }
+
+  public Stream<KVIndex> indices() {
+    return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+    Accessor a = accessors.get(indexName);
+    Preconditions.checkArgument(a != null, "No index %s.", indexName);
+    return a;
+  }
+
+  Accessor getParentAccessor(String indexName) {
+    KVIndex index = indices.get(indexName);
+    return index.parent().isEmpty() ? null : getAccessor(index.parent());
+  }
+
+  /**
+   * Abstracts the difference between invoking a Field and a Method.
+   */
+  interface Accessor {
+
+    Object get(Object instance) throws Exception;
+
+  }
+
+  private class FieldAccessor implements Accessor {
+
+    private final Field field;
+
+    FieldAccessor(Field field) {
+      this.field = field;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return field.get(instance);
+    }
+
+  }
+
+  private class MethodAccessor implements Accessor {
+
+    private final Method method;
+
+    MethodAccessor(Method method) {
+      this.method = method;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return method.invoke(instance);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
new file mode 100644
index 0000000..310febc
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data store.
+ */
+@Private
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  final AtomicReference<DB> _db;
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the 
store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, 
since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
+
+  public LevelDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+
+    Options options = new Options();
+    options.createIfMissing(!path.exists());
+    this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not 
allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    LevelDBTypeInfo ti = getTypeInfo(value.getClass());
+
+    try (WriteBatch batch = db().createWriteBatch()) {
+      byte[] data = serializer.serialize(value);
+      synchronized (ti) {
+        Object existing;
+        try {
+          existing = get(ti.naturalIndex().entityKey(null, value), 
value.getClass());
+        } catch (NoSuchElementException e) {
+          existing = null;
+        }
+
+        PrefixCache cache = new PrefixCache(value);
+        byte[] naturalKey = 
ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
+        for (LevelDBTypeInfo.Index idx : ti.indices()) {
+          byte[] prefix = cache.getPrefix(idx);
+          idx.add(batch, value, existing, data, naturalKey, prefix);
+        }
+        db().write(batch);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not 
allowed.");
+    try (WriteBatch batch = db().createWriteBatch()) {
+      LevelDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = 
ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (LevelDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(batch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>(type) {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          return new LevelDBIterator<>(LevelDB.this, this);
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    DB _db = this._db.getAndSet(null);
+    if (_db == null) {
+      return;
+    }
+
+    try {
+      _db.close();
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  /** Returns metadata about indices for the given type. */
+  LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    LevelDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, 
getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the 
JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, 
but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  DB db() {
+    DB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {
+
+    private final Object entity;
+    private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
+
+    PrefixCache(Object entity) {
+      this.entity = entity;
+      this.prefixes = new HashMap<>();
+    }
+
+    byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
+      byte[] prefix = null;
+      if (idx.isChild()) {
+        prefix = prefixes.get(idx.parent());
+        if (prefix == null) {
+          prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
+          prefixes.put(idx.parent(), prefix);
+        }
+      }
+      return prefix;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
new file mode 100644
index 0000000..a2181f3
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.DBIterator;
+
+class LevelDBIterator<T> implements KVStoreIterator<T> {
+
+  private final LevelDB db;
+  private final boolean ascending;
+  private final DBIterator it;
+  private final Class<T> type;
+  private final LevelDBTypeInfo ti;
+  private final LevelDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().iterator();
+    this.type = params.type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", 
params.index);
+    byte[] parent = index.isChild() ? 
index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.hasNext()) {
+        // When descending, the caller may have set up the start of iteration 
at a non-existant
+        // entry that is guaranteed to be after the desired entry. For 
example, if you have a
+        // compound key (a, b) where b is a, integer, you may seek to the end 
of the elements that
+        // have the same "a" value by specifying Integer.MAX_VALUE for "b", 
and that value may not
+        // exist in the database. So need to check here whether the next value 
actually belongs to
+        // the set being returned by the iterator before advancing.
+        byte[] nextKey = it.peekNext().getKey();
+        if (compare(nextKey, indexKeyPrefix) <= 0) {
+          it.next();
+        }
+      }
+    }
+    this.end = end;
+
+    if (params.skip > 0) {
+      skip(params.skip);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+
+    try {
+      T ret;
+      if (index == null || index.isCopy()) {
+        ret = db.serializer.deserialize(next, type);
+      } else {
+        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), 
next);
+        ret = db.get(key, type);
+      }
+      next = null;
+      return ret;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<T> next(int max) {
+    List<T> list = new ArrayList<>(max);
+    while (hasNext() && list.size() < max) {
+      list.add(next());
+    }
+    return list;
+  }
+
+  @Override
+  public boolean skip(long n) {
+    long skipped = 0;
+    while (skipped < n) {
+      if (next != null) {
+        checkedNext = false;
+        next = null;
+        skipped++;
+        continue;
+      }
+
+      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+      if (!hasNext) {
+        checkedNext = true;
+        return false;
+      }
+
+      Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
+      if (!isEndMarker(e.getKey())) {
+        skipped++;
+      }
+    }
+
+    return hasNext();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      it.close();
+      closed = true;
+    }
+  }
+
+  private byte[] loadNext() {
+    if (count >= max) {
+      return null;
+    }
+
+    try {
+      while (true) {
+        boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+        if (!hasNext) {
+          return null;
+        }
+
+        Map.Entry<byte[], byte[]> nextEntry;
+        try {
+          // Avoid races if another thread is updating the DB.
+          nextEntry = ascending ? it.next() : it.prev();
+        } catch (NoSuchElementException e) {
+          return null;
+        }
+
+        byte[] nextKey = nextEntry.getKey();
+        // Next key is not part of the index, stop.
+        if (!startsWith(nextKey, indexKeyPrefix)) {
+          return null;
+        }
+
+        // If the next key is an end marker, then skip it.
+        if (isEndMarker(nextKey)) {
+          continue;
+        }
+
+        // If there's a known end key and iteration has gone past it, stop.
+        if (end != null) {
+          int comp = compare(nextKey, end) * (ascending ? 1 : -1);
+          if (comp > 0) {
+            return null;
+          }
+        }
+
+        count++;
+
+        // Next element is part of the iteration, return it.
+        return nextEntry.getValue();
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean startsWith(byte[] key, byte[] prefix) {
+    if (key.length < prefix.length) {
+      return false;
+    }
+
+    for (int i = 0; i < prefix.length; i++) {
+      if (key[i] != prefix[i]) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean isEndMarker(byte[] key) {
+    return (key.length > 2 &&
+        key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
+        key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
+  }
+
+  static int compare(byte[] a, byte[] b) {
+    int diff = 0;
+    int minLen = Math.min(a.length, b.length);
+    for (int i = 0; i < minLen; i++) {
+      diff += (a[i] - b[i]);
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return a.length - b.length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
new file mode 100644
index 0000000..93aa0bb
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * <p>
+ * The hierarchy of keys stored in LevelDB looks roughly like the following. 
This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the store 
is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * </p>
+ *
+ * <p>
+ * Indentation defines when a sub-key lives under a parent key. In LevelDB, 
this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * </p>
+ *
+ * <pre>
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ *     +NATURAL_KEY
+ *     -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ *     +INDEX_VALUE
+ *       +NATURAL_KEY
+ *     -INDEX_VALUE
+ *     .INDEX_VALUE
+ *       CHILD_INDEX_NAME
+ *         +CHILD_INDEX_VALUE
+ *           NATURAL_KEY_OR_DATA
+ *         -
+ *   -INDEX_NAME
+ * </pre>
+ *
+ * <p>
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+<something>". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-<something>"). A count is also kept at 
the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * </p>
+ *
+ * <p>
+ * To illustrate, given a type "Foo", with a natural index and a second index 
called "bar", you'd
+ * have these keys and values in the store for two instances, one with natural 
key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * </p>
+ *
+ * <pre>
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -       [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -       [count of all Foo with "bar=yes" ]
+ * </pre>
+ *
+ * <p>
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an index 
by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code and 
comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" of 
the entity.
+ * </p>
+ *
+ * <p>
+ * Child indices are stored after their parent index. In the example above, 
let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have value 
"no" for this field,
+ * the data in the store would look something like the following:
+ * </p>
+ *
+ * <pre>
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index 
type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index 
type]
+ * ...
+ * </pre>
+ */
+class LevelDBTypeInfo {
+
+  static final byte[] END_MARKER = new byte[] { '-' };
+  static final byte ENTRY_PREFIX = (byte) '+';
+  static final byte KEY_SEPARATOR = 0x0;
+  static byte TRUE = (byte) '1';
+  static byte FALSE = (byte) '0';
+
+  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
+  private static final byte POSITIVE_MARKER = (byte) '=';
+  private static final byte NEGATIVE_MARKER = (byte) '*';
+  private static final byte[] HEX_BYTES = new byte[] {
+    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 
'f'
+  };
+
+  private final LevelDB db;
+  private final Class<?> type;
+  private final Map<String, Index> indices;
+  private final byte[] typePrefix;
+
+  LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
+    this.db = db;
+    this.type = type;
+    this.indices = new HashMap<>();
+
+    KVTypeInfo ti = new KVTypeInfo(type);
+
+    // First create the parent indices, then the child indices.
+    ti.indices().forEach(idx -> {
+      if (idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), 
null));
+      }
+    });
+    ti.indices().forEach(idx -> {
+      if (!idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
+          indices.get(idx.parent())));
+      }
+    });
+
+    this.typePrefix = alias;
+  }
+
+  Class<?> type() {
+    return type;
+  }
+
+  byte[] keyPrefix() {
+    return typePrefix;
+  }
+
+  Index naturalIndex() {
+    return index(KVIndex.NATURAL_INDEX_NAME);
+  }
+
+  Index index(String name) {
+    Index i = indices.get(name);
+    Preconditions.checkArgument(i != null, "Index %s does not exist for type 
%s.", name,
+      type.getName());
+    return i;
+  }
+
+  Collection<Index> indices() {
+    return indices.values();
+  }
+
+  byte[] buildKey(byte[]... components) {
+    return buildKey(true, components);
+  }
+
+  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
+    int len = 0;
+    if (addTypePrefix) {
+      len += typePrefix.length + 1;
+    }
+    for (byte[] comp : components) {
+      len += comp.length;
+    }
+    len += components.length - 1;
+
+    byte[] dest = new byte[len];
+    int written = 0;
+
+    if (addTypePrefix) {
+      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
+      dest[typePrefix.length] = KEY_SEPARATOR;
+      written += typePrefix.length + 1;
+    }
+
+    for (byte[] comp : components) {
+      System.arraycopy(comp, 0, dest, written, comp.length);
+      written += comp.length;
+      if (written < dest.length) {
+        dest[written] = KEY_SEPARATOR;
+        written++;
+      }
+    }
+
+    return dest;
+  }
+
+  /**
+   * Models a single index in LevelDB. See top-level class's javadoc for a 
description of how the
+   * keys are generated.
+   */
+  class Index {
+
+    private final boolean copy;
+    private final boolean isNatural;
+    private final byte[] name;
+    private final KVTypeInfo.Accessor accessor;
+    private final Index parent;
+
+    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
+      byte[] name = self.value().getBytes(UTF_8);
+      if (parent != null) {
+        byte[] child = new byte[name.length + 1];
+        child[0] = SECONDARY_IDX_PREFIX;
+        System.arraycopy(name, 0, child, 1, name.length);
+      }
+
+      this.name = name;
+      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
+      this.copy = isNatural || self.copy();
+      this.accessor = accessor;
+      this.parent = parent;
+    }
+
+    boolean isCopy() {
+      return copy;
+    }
+
+    boolean isChild() {
+      return parent != null;
+    }
+
+    Index parent() {
+      return parent;
+    }
+
+    /**
+     * Creates a key prefix for child indices of this index. This allows the 
prefix to be
+     * calculated only once, avoiding redundant work when multiple child 
indices of the
+     * same parent index exist.
+     */
+    byte[] childPrefix(Object value) throws Exception {
+      Preconditions.checkState(parent == null, "Not a parent index.");
+      return buildKey(name, toParentKey(value));
+    }
+
+    /**
+     * Gets the index value for a particular entity (which is the value of the 
field or method
+     * tagged with the index annotation). This is used as part of the LevelDB 
key where the
+     * entity (or its id) is stored.
+     */
+    Object getValue(Object entity) throws Exception {
+      return accessor.get(entity);
+    }
+
+    private void checkParent(byte[] prefix) {
+      if (prefix != null) {
+        Preconditions.checkState(parent != null, "Parent prefix provided for 
parent index.");
+      } else {
+        Preconditions.checkState(parent == null, "Parent prefix missing for 
child index.");
+      }
+    }
+
+    /** The prefix for all keys that belong to this index. */
+    byte[] keyPrefix(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
+    }
+
+    /**
+     * The key where to start ascending iteration for entities whose value for 
the indexed field
+     * match the given value.
+     */
+    byte[] start(byte[] prefix, Object value) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
+        : buildKey(name, toKey(value));
+    }
+
+    /** The key for the index's end marker. */
+    byte[] end(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
+        : buildKey(name, END_MARKER);
+    }
+
+    /** The key for the end marker for entries with the given value. */
+    byte[] end(byte[] prefix, Object value) throws Exception {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value), 
END_MARKER)
+        : buildKey(name, toKey(value), END_MARKER);
+    }
+
+    /** The full key in the index that identifies the given entity. */
+    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type 
%s.",
+        name, type.getName());
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, 
toKey(naturalIndex().getValue(entity)));
+      }
+      return entityKey;
+    }
+
+    private void updateCount(WriteBatch batch, byte[] key, long delta) throws 
Exception {
+      long updated = getCount(key) + delta;
+      if (updated > 0) {
+        batch.put(key, db.serializer.serialize(updated));
+      } else {
+        batch.delete(key);
+      }
+    }
+
+    private void addOrRemove(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type 
%s.",
+        name, type.getName());
+
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, naturalKey);
+      }
+
+      boolean needCountUpdate = (existing == null);
+
+      // Check whether there's a need to update the index. The index needs to 
be updated in two
+      // cases:
+      //
+      // - There is no existing value for the entity, so a new index value 
will be added.
+      // - If there is a previously stored value for the entity, and the index 
value for the
+      //   current index does not match the new value, the old entry needs to 
be deleted and
+      //   the new one added.
+      //
+      // Natural indices don't need to be checked, because by definition both 
old and new entities
+      // will have the same key. The put() call is all that's needed in that 
case.
+      //
+      // Also check whether we need to update the counts. If the indexed value 
is changing, we
+      // need to decrement the count at the old index value, and the new 
indexed value count needs
+      // to be incremented.
+      if (existing != null && !isNatural) {
+        byte[] oldPrefix = null;
+        Object oldIndexedValue = getValue(existing);
+        boolean removeExisting = !indexValue.equals(oldIndexedValue);
+        if (!removeExisting && isChild()) {
+          oldPrefix = parent().childPrefix(parent().getValue(existing));
+          removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
+        }
+
+        if (removeExisting) {
+          if (oldPrefix == null && isChild()) {
+            oldPrefix = parent().childPrefix(parent().getValue(existing));
+          }
+
+          byte[] oldKey = entityKey(oldPrefix, existing);
+          batch.delete(oldKey);
+
+          // If the indexed value has changed, we need to update the counts at 
the old and new
+          // end markers for the indexed value.
+          if (!isChild()) {
+            byte[] oldCountKey = end(null, oldIndexedValue);
+            updateCount(batch, oldCountKey, -1L);
+            needCountUpdate = true;
+          }
+        }
+      }
+
+      if (data != null) {
+        byte[] stored = copy ? data : naturalKey;
+        batch.put(entityKey, stored);
+      } else {
+        batch.delete(entityKey);
+      }
+
+      if (needCountUpdate && !isChild()) {
+        long delta = data != null ? 1L : -1L;
+        byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
+        updateCount(batch, countKey, delta);
+      }
+    }
+
+    /**
+     * Add an entry to the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being added to the index.
+     * @param existing The entity being replaced in the index, or null.
+     * @param data Serialized entity to store (when storing the entity, not a 
reference).
+     * @param naturalKey The value's natural key (to avoid re-computing it for 
every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void add(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, existing, data, naturalKey, prefix);
+    }
+
+    /**
+     * Remove a value from the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being removed, to identify the index entry to 
modify.
+     * @param naturalKey The value's natural key (to avoid re-computing it for 
every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void remove(
+        WriteBatch batch,
+        Object entity,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, null, null, naturalKey, prefix);
+    }
+
+    long getCount(byte[] key) throws Exception {
+      byte[] data = db.db().get(key);
+      return data != null ? db.serializer.deserializeLong(data) : 0;
+    }
+
+    byte[] toParentKey(Object value) {
+      return toKey(value, SECONDARY_IDX_PREFIX);
+    }
+
+    byte[] toKey(Object value) {
+      return toKey(value, ENTRY_PREFIX);
+    }
+
+    /**
+     * Translates a value to be used as part of the store key.
+     *
+     * Integral numbers are encoded as a string in a way that preserves 
lexicographical
+     * ordering. The string is prepended with a marker telling whether the 
number is negative
+     * or positive ("*" for negative and "=" for positive are used since "-" 
and "+" have the
+     * opposite of the desired order), and then the number is encoded into a 
hex string (so
+     * it occupies twice the number of bytes as the original type).
+     *
+     * Arrays are encoded by encoding each element separately, separated by 
KEY_SEPARATOR.
+     */
+    byte[] toKey(Object value, byte prefix) {
+      final byte[] result;
+
+      if (value instanceof String) {
+        byte[] str = ((String) value).getBytes(UTF_8);
+        result = new byte[str.length + 1];
+        result[0] = prefix;
+        System.arraycopy(str, 0, result, 1, str.length);
+      } else if (value instanceof Boolean) {
+        result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
+      } else if (value.getClass().isArray()) {
+        int length = Array.getLength(value);
+        byte[][] components = new byte[length][];
+        for (int i = 0; i < length; i++) {
+          components[i] = toKey(Array.get(value, i));
+        }
+        result = buildKey(false, components);
+      } else {
+        int bytes;
+
+        if (value instanceof Integer) {
+          bytes = Integer.SIZE;
+        } else if (value instanceof Long) {
+          bytes = Long.SIZE;
+        } else if (value instanceof Short) {
+          bytes = Short.SIZE;
+        } else if (value instanceof Byte) {
+          bytes = Byte.SIZE;
+        } else {
+          throw new IllegalArgumentException(String.format("Type %s not 
allowed as key.",
+            value.getClass().getName()));
+        }
+
+        bytes = bytes / Byte.SIZE;
+
+        byte[] key = new byte[bytes * 2 + 2];
+        long longValue = ((Number) value).longValue();
+        key[0] = prefix;
+        key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
+
+        for (int i = 0; i < key.length - 2; i++) {
+          int masked = (int) ((longValue >>> (4 * i)) & 0xF);
+          key[key.length - i - 1] = HEX_BYTES[masked];
+        }
+
+        result = key;
+      }
+
+      return result;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java
new file mode 100644
index 0000000..75a33b7
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.IOException;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Exception thrown when the store implementation is not compatible with the 
underlying data.
+ */
+@Private
+public class UnsupportedStoreVersionException extends IOException {
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
deleted file mode 100644
index d5938ac..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.util.Arrays;
-
-public class ArrayKeyIndexType {
-
-  @KVIndex
-  public int[] key;
-
-  @KVIndex("id")
-  public String[] id;
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof ArrayKeyIndexType) {
-      ArrayKeyIndexType other = (ArrayKeyIndexType) o;
-      return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return key.hashCode();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
deleted file mode 100644
index f9b4774..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class ArrayWrappersSuite {
-
-  @Test
-  public void testGenericArrayKey() {
-   byte[] b1 = new byte[] { 0x01, 0x02, 0x03 };
-   byte[] b2 = new byte[] { 0x01, 0x02 };
-   int[] i1 = new int[] { 1, 2, 3 };
-   int[] i2 = new int[] { 1, 2 };
-   String[] s1 = new String[] { "1", "2", "3" };
-   String[] s2 = new String[] { "1", "2" };
-
-   assertEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b1));
-   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b2));
-   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(i1));
-   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(s1));
-
-   assertEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i1));
-   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i2));
-   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(b1));
-   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(s1));
-
-   assertEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s1));
-   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s2));
-   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(b1));
-   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(i1));
-
-   assertEquals(0, 
ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b1)));
-   assertTrue(ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b2)) 
> 0);
-
-   assertEquals(0, 
ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i1)));
-   assertTrue(ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i2)) 
> 0);
-
-   assertEquals(0, 
ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s1)));
-   assertTrue(ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s2)) 
> 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
deleted file mode 100644
index afb72b8..0000000
--- a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import com.google.common.base.Objects;
-
-public class CustomType1 {
-
-  @KVIndex
-  public String key;
-
-  @KVIndex("id")
-  public String id;
-
-  @KVIndex(value = "name", copy = true)
-  public String name;
-
-  @KVIndex("int")
-  public int num;
-
-  @KVIndex(value = "child", parent = "id")
-  public String child;
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof CustomType1) {
-      CustomType1 other = (CustomType1) o;
-      return id.equals(other.id) && name.equals(other.name);
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return id.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("key", key)
-      .add("id", id)
-      .add("name", name)
-      .add("num", num)
-      .toString();
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to