[SPARK-21671][CORE] Move kvstore to "util" sub-package, add private annotation.
Author: Marcelo Vanzin <van...@cloudera.com> Closes #18886 from vanzin/SPARK-21671. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c1bfb49 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c1bfb49 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c1bfb49 Branch: refs/heads/master Commit: 2c1bfb497f31ff402796b57b617a9075c6044e4d Parents: 979bf94 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Tue Aug 8 14:33:27 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Aug 8 14:33:27 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/kvstore/ArrayWrappers.java | 213 -------- .../org/apache/spark/kvstore/InMemoryStore.java | 320 ------------ .../java/org/apache/spark/kvstore/KVIndex.java | 82 --- .../java/org/apache/spark/kvstore/KVStore.java | 126 ----- .../apache/spark/kvstore/KVStoreIterator.java | 47 -- .../apache/spark/kvstore/KVStoreSerializer.java | 86 ---- .../org/apache/spark/kvstore/KVStoreView.java | 123 ----- .../org/apache/spark/kvstore/KVTypeInfo.java | 154 ------ .../java/org/apache/spark/kvstore/LevelDB.java | 307 ----------- .../apache/spark/kvstore/LevelDBIterator.java | 277 ---------- .../apache/spark/kvstore/LevelDBTypeInfo.java | 511 ------------------- .../UnsupportedStoreVersionException.java | 27 - .../spark/util/kvstore/ArrayWrappers.java | 213 ++++++++ .../spark/util/kvstore/InMemoryStore.java | 320 ++++++++++++ .../org/apache/spark/util/kvstore/KVIndex.java | 85 +++ .../org/apache/spark/util/kvstore/KVStore.java | 129 +++++ .../spark/util/kvstore/KVStoreIterator.java | 50 ++ .../spark/util/kvstore/KVStoreSerializer.java | 89 ++++ .../apache/spark/util/kvstore/KVStoreView.java | 126 +++++ .../apache/spark/util/kvstore/KVTypeInfo.java | 157 ++++++ .../org/apache/spark/util/kvstore/LevelDB.java | 310 +++++++++++ .../spark/util/kvstore/LevelDBIterator.java | 277 ++++++++++ .../spark/util/kvstore/LevelDBTypeInfo.java | 511 +++++++++++++++++++ .../UnsupportedStoreVersionException.java | 30 ++ .../apache/spark/kvstore/ArrayKeyIndexType.java | 44 -- .../spark/kvstore/ArrayWrappersSuite.java | 59 --- .../org/apache/spark/kvstore/CustomType1.java | 63 --- .../apache/spark/kvstore/DBIteratorSuite.java | 504 ------------------ .../spark/kvstore/InMemoryIteratorSuite.java | 27 - .../spark/kvstore/InMemoryStoreSuite.java | 161 ------ .../apache/spark/kvstore/LevelDBBenchmark.java | 280 ---------- .../spark/kvstore/LevelDBIteratorSuite.java | 48 -- .../org/apache/spark/kvstore/LevelDBSuite.java | 286 ----------- .../spark/kvstore/LevelDBTypeInfoSuite.java | 207 -------- .../spark/util/kvstore/ArrayKeyIndexType.java | 44 ++ .../spark/util/kvstore/ArrayWrappersSuite.java | 59 +++ .../apache/spark/util/kvstore/CustomType1.java | 63 +++ .../spark/util/kvstore/DBIteratorSuite.java | 504 ++++++++++++++++++ .../util/kvstore/InMemoryIteratorSuite.java | 27 + .../spark/util/kvstore/InMemoryStoreSuite.java | 161 ++++++ .../spark/util/kvstore/LevelDBBenchmark.java | 280 ++++++++++ .../util/kvstore/LevelDBIteratorSuite.java | 48 ++ .../apache/spark/util/kvstore/LevelDBSuite.java | 286 +++++++++++ .../util/kvstore/LevelDBTypeInfoSuite.java | 207 ++++++++ 44 files changed, 3976 insertions(+), 3952 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java deleted file mode 100644 index 5efa842..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.util.Arrays; - -import com.google.common.base.Preconditions; - -/** - * A factory for array wrappers so that arrays can be used as keys in a map, sorted or not. - * - * The comparator implementation makes two assumptions: - * - All elements are instances of Comparable - * - When comparing two arrays, they both contain elements of the same type in corresponding - * indices. - * - * Otherwise, ClassCastExceptions may occur. The equality method can compare any two arrays. - * - * This class is not efficient and is mostly meant to compare really small arrays, like those - * generally used as indices and keys in a KVStore. - */ -class ArrayWrappers { - - @SuppressWarnings("unchecked") - public static Comparable<Object> forArray(Object a) { - Preconditions.checkArgument(a.getClass().isArray()); - Comparable<?> ret; - if (a instanceof int[]) { - ret = new ComparableIntArray((int[]) a); - } else if (a instanceof long[]) { - ret = new ComparableLongArray((long[]) a); - } else if (a instanceof byte[]) { - ret = new ComparableByteArray((byte[]) a); - } else { - Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive()); - ret = new ComparableObjectArray((Object[]) a); - } - return (Comparable<Object>) ret; - } - - private static class ComparableIntArray implements Comparable<ComparableIntArray> { - - private final int[] array; - - ComparableIntArray(int[] array) { - this.array = array; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ComparableIntArray)) { - return false; - } - return Arrays.equals(array, ((ComparableIntArray) other).array); - } - - @Override - public int hashCode() { - int code = 0; - for (int i = 0; i < array.length; i++) { - code = (code * 31) + array[i]; - } - return code; - } - - @Override - public int compareTo(ComparableIntArray other) { - int len = Math.min(array.length, other.array.length); - for (int i = 0; i < len; i++) { - int diff = array[i] - other.array[i]; - if (diff != 0) { - return diff; - } - } - - return array.length - other.array.length; - } - } - - private static class ComparableLongArray implements Comparable<ComparableLongArray> { - - private final long[] array; - - ComparableLongArray(long[] array) { - this.array = array; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ComparableLongArray)) { - return false; - } - return Arrays.equals(array, ((ComparableLongArray) other).array); - } - - @Override - public int hashCode() { - int code = 0; - for (int i = 0; i < array.length; i++) { - code = (code * 31) + (int) array[i]; - } - return code; - } - - @Override - public int compareTo(ComparableLongArray other) { - int len = Math.min(array.length, other.array.length); - for (int i = 0; i < len; i++) { - long diff = array[i] - other.array[i]; - if (diff != 0) { - return diff > 0 ? 1 : -1; - } - } - - return array.length - other.array.length; - } - } - - private static class ComparableByteArray implements Comparable<ComparableByteArray> { - - private final byte[] array; - - ComparableByteArray(byte[] array) { - this.array = array; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ComparableByteArray)) { - return false; - } - return Arrays.equals(array, ((ComparableByteArray) other).array); - } - - @Override - public int hashCode() { - int code = 0; - for (int i = 0; i < array.length; i++) { - code = (code * 31) + array[i]; - } - return code; - } - - @Override - public int compareTo(ComparableByteArray other) { - int len = Math.min(array.length, other.array.length); - for (int i = 0; i < len; i++) { - int diff = array[i] - other.array[i]; - if (diff != 0) { - return diff; - } - } - - return array.length - other.array.length; - } - } - - private static class ComparableObjectArray implements Comparable<ComparableObjectArray> { - - private final Object[] array; - - ComparableObjectArray(Object[] array) { - this.array = array; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ComparableObjectArray)) { - return false; - } - return Arrays.equals(array, ((ComparableObjectArray) other).array); - } - - @Override - public int hashCode() { - int code = 0; - for (int i = 0; i < array.length; i++) { - code = (code * 31) + array[i].hashCode(); - } - return code; - } - - @Override - @SuppressWarnings("unchecked") - public int compareTo(ComparableObjectArray other) { - int len = Math.min(array.length, other.array.length); - for (int i = 0; i < len; i++) { - int diff = ((Comparable<Object>) array[i]).compareTo((Comparable<Object>) other.array[i]); - if (diff != 0) { - return diff; - } - } - - return array.length - other.array.length; - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java deleted file mode 100644 index f3aeb82..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; - -import org.apache.spark.annotation.Private; - -/** - * Implementation of KVStore that keeps data deserialized in memory. This store does not index - * data; instead, whenever iterating over an indexed field, the stored data is copied and sorted - * according to the index. This saves memory but makes iteration more expensive. - */ -@Private -public class InMemoryStore implements KVStore { - - private Object metadata; - private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>(); - - @Override - public <T> T getMetadata(Class<T> klass) { - return klass.cast(metadata); - } - - @Override - public void setMetadata(Object value) { - this.metadata = value; - } - - @Override - public long count(Class<?> type) { - InstanceList list = data.get(type); - return list != null ? list.size() : 0; - } - - @Override - public long count(Class<?> type, String index, Object indexedValue) throws Exception { - InstanceList list = data.get(type); - int count = 0; - Object comparable = asKey(indexedValue); - KVTypeInfo.Accessor accessor = list.getIndexAccessor(index); - for (Object o : view(type)) { - if (Objects.equal(comparable, asKey(accessor.get(o)))) { - count++; - } - } - return count; - } - - @Override - public <T> T read(Class<T> klass, Object naturalKey) { - InstanceList list = data.get(klass); - Object value = list != null ? list.get(naturalKey) : null; - if (value == null) { - throw new NoSuchElementException(); - } - return klass.cast(value); - } - - @Override - public void write(Object value) throws Exception { - InstanceList list = data.computeIfAbsent(value.getClass(), key -> { - try { - return new InstanceList(key); - } catch (Exception e) { - throw Throwables.propagate(e); - } - }); - list.put(value); - } - - @Override - public void delete(Class<?> type, Object naturalKey) { - InstanceList list = data.get(type); - if (list != null) { - list.delete(naturalKey); - } - } - - @Override - public <T> KVStoreView<T> view(Class<T> type){ - InstanceList list = data.get(type); - return list != null ? list.view(type) - : new InMemoryView<>(type, Collections.<T>emptyList(), null); - } - - @Override - public void close() { - metadata = null; - data.clear(); - } - - @SuppressWarnings("unchecked") - private static Comparable<Object> asKey(Object in) { - if (in.getClass().isArray()) { - in = ArrayWrappers.forArray(in); - } - return (Comparable<Object>) in; - } - - private static class InstanceList { - - private final KVTypeInfo ti; - private final KVTypeInfo.Accessor naturalKey; - private final ConcurrentMap<Comparable<Object>, Object> data; - - private int size; - - private InstanceList(Class<?> type) throws Exception { - this.ti = new KVTypeInfo(type); - this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); - this.data = new ConcurrentHashMap<>(); - this.size = 0; - } - - KVTypeInfo.Accessor getIndexAccessor(String indexName) { - return ti.getAccessor(indexName); - } - - public Object get(Object key) { - return data.get(asKey(key)); - } - - public void put(Object value) throws Exception { - Preconditions.checkArgument(ti.type().equals(value.getClass()), - "Unexpected type: %s", value.getClass()); - if (data.put(asKey(naturalKey.get(value)), value) == null) { - size++; - } - } - - public void delete(Object key) { - if (data.remove(asKey(key)) != null) { - size--; - } - } - - public int size() { - return size; - } - - @SuppressWarnings("unchecked") - public <T> InMemoryView<T> view(Class<T> type) { - Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); - Collection<T> all = (Collection<T>) data.values(); - return new InMemoryView(type, all, ti); - } - - } - - private static class InMemoryView<T> extends KVStoreView<T> { - - private final Collection<T> elements; - private final KVTypeInfo ti; - private final KVTypeInfo.Accessor natural; - - InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) { - super(type); - this.elements = elements; - this.ti = ti; - this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null; - } - - @Override - public Iterator<T> iterator() { - if (elements.isEmpty()) { - return new InMemoryIterator<>(elements.iterator()); - } - - try { - KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null; - int modifier = ascending ? 1 : -1; - - final List<T> sorted = copyElements(); - Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter)); - Stream<T> stream = sorted.stream(); - - if (first != null) { - stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0); - } - - if (last != null) { - stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0); - } - - if (skip > 0) { - stream = stream.skip(skip); - } - - if (max < sorted.size()) { - stream = stream.limit((int) max); - } - - return new InMemoryIterator<>(stream.iterator()); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - /** - * Create a copy of the input elements, filtering the values for child indices if needed. - */ - private List<T> copyElements() { - if (parent != null) { - KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); - Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); - - return elements.stream() - .filter(e -> compare(e, parentGetter, parent) == 0) - .collect(Collectors.toList()); - } else { - return new ArrayList<>(elements); - } - } - - private int compare(T e1, T e2, KVTypeInfo.Accessor getter) { - try { - int diff = compare(e1, getter, getter.get(e2)); - if (diff == 0 && getter != natural) { - diff = compare(e1, natural, natural.get(e2)); - } - return diff; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) { - try { - return asKey(getter.get(e1)).compareTo(asKey(v2)); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - } - - private static class InMemoryIterator<T> implements KVStoreIterator<T> { - - private final Iterator<T> iter; - - InMemoryIterator(Iterator<T> iter) { - this.iter = iter; - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public T next() { - return iter.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public List<T> next(int max) { - List<T> list = new ArrayList<>(max); - while (hasNext() && list.size() < max) { - list.add(next()); - } - return list; - } - - @Override - public boolean skip(long n) { - long skipped = 0; - while (skipped < n) { - if (hasNext()) { - next(); - skipped++; - } else { - return false; - } - } - - return hasNext(); - } - - @Override - public void close() { - // no op. - } - - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java deleted file mode 100644 index 0cffefe..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Tags a field to be indexed when storing an object. - * - * <p> - * Types are required to have a natural index that uniquely identifies instances in the store. - * The default value of the annotation identifies the natural index for the type. - * </p> - * - * <p> - * Indexes allow for more efficient sorting of data read from the store. By annotating a field or - * "getter" method with this annotation, an index will be created that will provide sorting based on - * the string value of that field. - * </p> - * - * <p> - * Note that creating indices means more space will be needed, and maintenance operations like - * updating or deleting a value will become more expensive. - * </p> - * - * <p> - * Indices are restricted to String, integral types (byte, short, int, long, boolean), and arrays - * of those values. - * </p> - */ -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.FIELD, ElementType.METHOD}) -public @interface KVIndex { - - String NATURAL_INDEX_NAME = "__main__"; - - /** - * The name of the index to be created for the annotated entity. Must be unique within - * the class. Index names are not allowed to start with an underscore (that's reserved for - * internal use). The default value is the natural index name (which is always a copy index - * regardless of the annotation's values). - */ - String value() default NATURAL_INDEX_NAME; - - /** - * The name of the parent index of this index. By default there is no parent index, so the - * generated data can be retrieved without having to provide a parent value. - * - * <p> - * If a parent index is defined, iterating over the data using the index will require providing - * a single value for the parent index. This serves as a rudimentary way to provide relationships - * between entities in the store. - * </p> - */ - String parent() default ""; - - /** - * Whether to copy the instance's data to the index, instead of just storing a pointer to the - * data. The default behavior is to just store a reference; that saves disk space but is slower - * to read, since there's a level of indirection. - */ - boolean copy() default false; - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java deleted file mode 100644 index c7808ea..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.io.Closeable; - -/** - * Abstraction for a local key/value store for storing app data. - * - * <p> - * There are two main features provided by the implementations of this interface: - * </p> - * - * <h3>Serialization</h3> - * - * <p> - * If the underlying data store requires serialization, data will be serialized to and deserialized - * using a {@link KVStoreSerializer}, which can be customized by the application. The serializer is - * based on Jackson, so it supports all the Jackson annotations for controlling the serialization of - * app-defined types. - * </p> - * - * <p> - * Data is also automatically compressed to save disk space. - * </p> - * - * <h3>Automatic Key Management</h3> - * - * <p> - * When using the built-in key management, the implementation will automatically create unique - * keys for each type written to the store. Keys are based on the type name, and always start - * with the "+" prefix character (so that it's easy to use both manual and automatic key - * management APIs without conflicts). - * </p> - * - * <p> - * Another feature of automatic key management is indexing; by annotating fields or methods of - * objects written to the store with {@link KVIndex}, indices are created to sort the data - * by the values of those properties. This makes it possible to provide sorting without having - * to load all instances of those types from the store. - * </p> - * - * <p> - * KVStore instances are thread-safe for both reads and writes. - * </p> - */ -public interface KVStore extends Closeable { - - /** - * Returns app-specific metadata from the store, or null if it's not currently set. - * - * <p> - * The metadata type is application-specific. This is a convenience method so that applications - * don't need to define their own keys for this information. - * </p> - */ - <T> T getMetadata(Class<T> klass) throws Exception; - - /** - * Writes the given value in the store metadata key. - */ - void setMetadata(Object value) throws Exception; - - /** - * Read a specific instance of an object. - * - * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys - * are not allowed. - * @throws java.util.NoSuchElementException If an element with the given key does not exist. - */ - <T> T read(Class<T> klass, Object naturalKey) throws Exception; - - /** - * Writes the given object to the store, including indexed fields. Indices are updated based - * on the annotated fields of the object's class. - * - * <p> - * Writes may be slower when the object already exists in the store, since it will involve - * updating existing indices. - * </p> - * - * @param value The object to write. - */ - void write(Object value) throws Exception; - - /** - * Removes an object and all data related to it, like index entries, from the store. - * - * @param type The object's type. - * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys - * are not allowed. - * @throws java.util.NoSuchElementException If an element with the given key does not exist. - */ - void delete(Class<?> type, Object naturalKey) throws Exception; - - /** - * Returns a configurable view for iterating over entities of the given type. - */ - <T> KVStoreView<T> view(Class<T> type) throws Exception; - - /** - * Returns the number of items of the given type currently in the store. - */ - long count(Class<?> type) throws Exception; - - /** - * Returns the number of items of the given type which match the given indexed value. - */ - long count(Class<?> type, String index, Object indexedValue) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java deleted file mode 100644 index 3efdec9..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.util.Iterator; -import java.util.List; - -/** - * An iterator for KVStore. - * - * <p> - * Iterators may keep references to resources that need to be closed. It's recommended that users - * explicitly close iterators after they're used. - * </p> - */ -public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable { - - /** - * Retrieve multiple elements from the store. - * - * @param max Maximum number of elements to retrieve. - */ - List<T> next(int max); - - /** - * Skip in the iterator. - * - * @return Whether there are items left after skipping. - */ - boolean skip(long n); - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java deleted file mode 100644 index b84ec91..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * Serializer used to translate between app-defined types and the LevelDB store. - * - * <p> - * The serializer is based on Jackson, so values are written as JSON. It also allows "naked strings" - * and integers to be written as values directly, which will be written as UTF-8 strings. - * </p> - */ -public class KVStoreSerializer { - - /** - * Object mapper used to process app-specific types. If an application requires a specific - * configuration of the mapper, it can subclass this serializer and add custom configuration - * to this object. - */ - protected final ObjectMapper mapper; - - public KVStoreSerializer() { - this.mapper = new ObjectMapper(); - } - - public final byte[] serialize(Object o) throws Exception { - if (o instanceof String) { - return ((String) o).getBytes(UTF_8); - } else { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - GZIPOutputStream out = new GZIPOutputStream(bytes); - try { - mapper.writeValue(out, o); - } finally { - out.close(); - } - return bytes.toByteArray(); - } - } - - @SuppressWarnings("unchecked") - public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception { - if (klass.equals(String.class)) { - return (T) new String(data, UTF_8); - } else { - GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data)); - try { - return mapper.readValue(in, klass); - } finally { - in.close(); - } - } - } - - final byte[] serialize(long value) { - return String.valueOf(value).getBytes(UTF_8); - } - - final long deserializeLong(byte[] data) { - return Long.parseLong(new String(data, UTF_8)); - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java deleted file mode 100644 index 8cd1f52..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import com.google.common.base.Preconditions; - -/** - * A configurable view that allows iterating over values in a {@link KVStore}. - * - * <p> - * The different methods can be used to configure the behavior of the iterator. Calling the same - * method multiple times is allowed; the most recent value will be used. - * </p> - * - * <p> - * The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close - * when used in a for loop that exhausts their contents, but when used manually, they need - * to be closed explicitly unless all elements are read. - * </p> - */ -public abstract class KVStoreView<T> implements Iterable<T> { - - final Class<T> type; - - boolean ascending = true; - String index = KVIndex.NATURAL_INDEX_NAME; - Object first = null; - Object last = null; - Object parent = null; - long skip = 0L; - long max = Long.MAX_VALUE; - - public KVStoreView(Class<T> type) { - this.type = type; - } - - /** - * Reverses the order of iteration. By default, iterates in ascending order. - */ - public KVStoreView<T> reverse() { - ascending = !ascending; - return this; - } - - /** - * Iterates according to the given index. - */ - public KVStoreView<T> index(String name) { - this.index = Preconditions.checkNotNull(name); - return this; - } - - /** - * Defines the value of the parent index when iterating over a child index. Only elements that - * match the parent index's value will be included in the iteration. - * - * <p> - * Required for iterating over child indices, will generate an error if iterating over a - * parent-less index. - * </p> - */ - public KVStoreView<T> parent(Object value) { - this.parent = value; - return this; - } - - /** - * Iterates starting at the given value of the chosen index (inclusive). - */ - public KVStoreView<T> first(Object value) { - this.first = value; - return this; - } - - /** - * Stops iteration at the given value of the chosen index (inclusive). - */ - public KVStoreView<T> last(Object value) { - this.last = value; - return this; - } - - /** - * Stops iteration after a number of elements has been retrieved. - */ - public KVStoreView<T> max(long max) { - Preconditions.checkArgument(max > 0L, "max must be positive."); - this.max = max; - return this; - } - - /** - * Skips a number of elements at the start of iteration. Skipped elements are not accounted - * when using {@link #max(long)}. - */ - public KVStoreView<T> skip(long n) { - this.skip = n; - return this; - } - - /** - * Returns an iterator for the current configuration. - */ - public KVStoreIterator<T> closeableIterator() throws Exception { - return (KVStoreIterator<T>) iterator(); - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java deleted file mode 100644 index e3e61e0..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Stream; - -import com.google.common.base.Preconditions; - -/** - * Wrapper around types managed in a KVStore, providing easy access to their indexed fields. - */ -public class KVTypeInfo { - - private final Class<?> type; - private final Map<String, KVIndex> indices; - private final Map<String, Accessor> accessors; - - public KVTypeInfo(Class<?> type) throws Exception { - this.type = type; - this.accessors = new HashMap<>(); - this.indices = new HashMap<>(); - - for (Field f : type.getDeclaredFields()) { - KVIndex idx = f.getAnnotation(KVIndex.class); - if (idx != null) { - checkIndex(idx, indices); - indices.put(idx.value(), idx); - f.setAccessible(true); - accessors.put(idx.value(), new FieldAccessor(f)); - } - } - - for (Method m : type.getDeclaredMethods()) { - KVIndex idx = m.getAnnotation(KVIndex.class); - if (idx != null) { - checkIndex(idx, indices); - Preconditions.checkArgument(m.getParameterTypes().length == 0, - "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); - indices.put(idx.value(), idx); - m.setAccessible(true); - accessors.put(idx.value(), new MethodAccessor(m)); - } - } - - Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME), - "No natural index defined for type %s.", type.getName()); - Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(), - "Natural index of %s cannot have a parent.", type.getName()); - - for (KVIndex idx : indices.values()) { - if (!idx.parent().isEmpty()) { - KVIndex parent = indices.get(idx.parent()); - Preconditions.checkArgument(parent != null, - "Cannot find parent %s of index %s.", idx.parent(), idx.value()); - Preconditions.checkArgument(parent.parent().isEmpty(), - "Parent index %s of index %s cannot be itself a child index.", idx.parent(), idx.value()); - } - } - } - - private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) { - Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(), - "No name provided for index in type %s.", type.getName()); - Preconditions.checkArgument( - !idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME), - "Index name %s (in type %s) is not allowed.", idx.value(), type.getName()); - Preconditions.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()), - "Index %s cannot be parent of itself.", idx.value()); - Preconditions.checkArgument(!indices.containsKey(idx.value()), - "Duplicate index %s for type %s.", idx.value(), type.getName()); - } - - public Class<?> type() { - return type; - } - - public Object getIndexValue(String indexName, Object instance) throws Exception { - return getAccessor(indexName).get(instance); - } - - public Stream<KVIndex> indices() { - return indices.values().stream(); - } - - Accessor getAccessor(String indexName) { - Accessor a = accessors.get(indexName); - Preconditions.checkArgument(a != null, "No index %s.", indexName); - return a; - } - - Accessor getParentAccessor(String indexName) { - KVIndex index = indices.get(indexName); - return index.parent().isEmpty() ? null : getAccessor(index.parent()); - } - - /** - * Abstracts the difference between invoking a Field and a Method. - */ - interface Accessor { - - Object get(Object instance) throws Exception; - - } - - private class FieldAccessor implements Accessor { - - private final Field field; - - FieldAccessor(Field field) { - this.field = field; - } - - @Override - public Object get(Object instance) throws Exception { - return field.get(instance); - } - - } - - private class MethodAccessor implements Accessor { - - private final Method method; - - MethodAccessor(Method method) { - this.method = method; - } - - @Override - public Object get(Object instance) throws Exception { - return method.invoke(instance); - } - - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java deleted file mode 100644 index 2714135..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import org.fusesource.leveldbjni.JniDBFactory; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.Options; -import org.iq80.leveldb.WriteBatch; - -/** - * Implementation of KVStore that uses LevelDB as the underlying data store. - */ -public class LevelDB implements KVStore { - - @VisibleForTesting - static final long STORE_VERSION = 1L; - - @VisibleForTesting - static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8); - - /** DB key where app metadata is stored. */ - private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8); - - /** DB key where type aliases are stored. */ - private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8); - - final AtomicReference<DB> _db; - final KVStoreSerializer serializer; - - /** - * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two - * purposes: make the keys stored on disk shorter, and spread out the keys, since class names - * will often have a long, redundant prefix (think "org.apache.spark."). - */ - private final ConcurrentMap<String, byte[]> typeAliases; - private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types; - - public LevelDB(File path) throws Exception { - this(path, new KVStoreSerializer()); - } - - public LevelDB(File path, KVStoreSerializer serializer) throws Exception { - this.serializer = serializer; - this.types = new ConcurrentHashMap<>(); - - Options options = new Options(); - options.createIfMissing(!path.exists()); - this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); - - byte[] versionData = db().get(STORE_VERSION_KEY); - if (versionData != null) { - long version = serializer.deserializeLong(versionData); - if (version != STORE_VERSION) { - throw new UnsupportedStoreVersionException(); - } - } else { - db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION)); - } - - Map<String, byte[]> aliases; - try { - aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases; - } catch (NoSuchElementException e) { - aliases = new HashMap<>(); - } - typeAliases = new ConcurrentHashMap<>(aliases); - } - - @Override - public <T> T getMetadata(Class<T> klass) throws Exception { - try { - return get(METADATA_KEY, klass); - } catch (NoSuchElementException nsee) { - return null; - } - } - - @Override - public void setMetadata(Object value) throws Exception { - if (value != null) { - put(METADATA_KEY, value); - } else { - db().delete(METADATA_KEY); - } - } - - <T> T get(byte[] key, Class<T> klass) throws Exception { - byte[] data = db().get(key); - if (data == null) { - throw new NoSuchElementException(new String(key, UTF_8)); - } - return serializer.deserialize(data, klass); - } - - private void put(byte[] key, Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); - db().put(key, serializer.serialize(value)); - } - - @Override - public <T> T read(Class<T> klass, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); - byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey); - return get(key, klass); - } - - @Override - public void write(Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); - LevelDBTypeInfo ti = getTypeInfo(value.getClass()); - - try (WriteBatch batch = db().createWriteBatch()) { - byte[] data = serializer.serialize(value); - synchronized (ti) { - Object existing; - try { - existing = get(ti.naturalIndex().entityKey(null, value), value.getClass()); - } catch (NoSuchElementException e) { - existing = null; - } - - PrefixCache cache = new PrefixCache(value); - byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value)); - for (LevelDBTypeInfo.Index idx : ti.indices()) { - byte[] prefix = cache.getPrefix(idx); - idx.add(batch, value, existing, data, naturalKey, prefix); - } - db().write(batch); - } - } - } - - @Override - public void delete(Class<?> type, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); - try (WriteBatch batch = db().createWriteBatch()) { - LevelDBTypeInfo ti = getTypeInfo(type); - byte[] key = ti.naturalIndex().start(null, naturalKey); - synchronized (ti) { - byte[] data = db().get(key); - if (data != null) { - Object existing = serializer.deserialize(data, type); - PrefixCache cache = new PrefixCache(existing); - byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing)); - for (LevelDBTypeInfo.Index idx : ti.indices()) { - idx.remove(batch, existing, keyBytes, cache.getPrefix(idx)); - } - db().write(batch); - } - } - } catch (NoSuchElementException nse) { - // Ignore. - } - } - - @Override - public <T> KVStoreView<T> view(Class<T> type) throws Exception { - return new KVStoreView<T>(type) { - @Override - public Iterator<T> iterator() { - try { - return new LevelDBIterator<>(LevelDB.this, this); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - }; - } - - @Override - public long count(Class<?> type) throws Exception { - LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex(); - return idx.getCount(idx.end(null)); - } - - @Override - public long count(Class<?> type, String index, Object indexedValue) throws Exception { - LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index); - return idx.getCount(idx.end(null, indexedValue)); - } - - @Override - public void close() throws IOException { - DB _db = this._db.getAndSet(null); - if (_db == null) { - return; - } - - try { - _db.close(); - } catch (IOException ioe) { - throw ioe; - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - } - - /** Returns metadata about indices for the given type. */ - LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception { - LevelDBTypeInfo ti = types.get(type); - if (ti == null) { - LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type)); - ti = types.putIfAbsent(type, tmp); - if (ti == null) { - ti = tmp; - } - } - return ti; - } - - /** - * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't - * prevent methods that retrieved the instance from using it after close, but hopefully will - * catch most cases; otherwise, we'll need some kind of locking. - */ - DB db() { - DB _db = this._db.get(); - if (_db == null) { - throw new IllegalStateException("DB is closed."); - } - return _db; - } - - private byte[] getTypeAlias(Class<?> klass) throws Exception { - byte[] alias = typeAliases.get(klass.getName()); - if (alias == null) { - synchronized (typeAliases) { - byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8); - alias = typeAliases.putIfAbsent(klass.getName(), tmp); - if (alias == null) { - alias = tmp; - put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases)); - } - } - } - return alias; - } - - /** Needs to be public for Jackson. */ - public static class TypeAliases { - - public Map<String, byte[]> aliases; - - TypeAliases(Map<String, byte[]> aliases) { - this.aliases = aliases; - } - - TypeAliases() { - this(null); - } - - } - - private static class PrefixCache { - - private final Object entity; - private final Map<LevelDBTypeInfo.Index, byte[]> prefixes; - - PrefixCache(Object entity) { - this.entity = entity; - this.prefixes = new HashMap<>(); - } - - byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception { - byte[] prefix = null; - if (idx.isChild()) { - prefix = prefixes.get(idx.parent()); - if (prefix == null) { - prefix = idx.parent().childPrefix(idx.parent().getValue(entity)); - prefixes.put(idx.parent(), prefix); - } - } - return prefix; - } - - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java deleted file mode 100644 index 263d45c..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import org.iq80.leveldb.DBIterator; - -class LevelDBIterator<T> implements KVStoreIterator<T> { - - private final LevelDB db; - private final boolean ascending; - private final DBIterator it; - private final Class<T> type; - private final LevelDBTypeInfo ti; - private final LevelDBTypeInfo.Index index; - private final byte[] indexKeyPrefix; - private final byte[] end; - private final long max; - - private boolean checkedNext; - private byte[] next; - private boolean closed; - private long count; - - LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception { - this.db = db; - this.ascending = params.ascending; - this.it = db.db().iterator(); - this.type = params.type; - this.ti = db.getTypeInfo(type); - this.index = ti.index(params.index); - this.max = params.max; - - Preconditions.checkArgument(!index.isChild() || params.parent != null, - "Cannot iterate over child index %s without parent value.", params.index); - byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null; - - this.indexKeyPrefix = index.keyPrefix(parent); - - byte[] firstKey; - if (params.first != null) { - if (ascending) { - firstKey = index.start(parent, params.first); - } else { - firstKey = index.end(parent, params.first); - } - } else if (ascending) { - firstKey = index.keyPrefix(parent); - } else { - firstKey = index.end(parent); - } - it.seek(firstKey); - - byte[] end = null; - if (ascending) { - if (params.last != null) { - end = index.end(parent, params.last); - } else { - end = index.end(parent); - } - } else { - if (params.last != null) { - end = index.start(parent, params.last); - } - if (it.hasNext()) { - // When descending, the caller may have set up the start of iteration at a non-existant - // entry that is guaranteed to be after the desired entry. For example, if you have a - // compound key (a, b) where b is a, integer, you may seek to the end of the elements that - // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not - // exist in the database. So need to check here whether the next value actually belongs to - // the set being returned by the iterator before advancing. - byte[] nextKey = it.peekNext().getKey(); - if (compare(nextKey, indexKeyPrefix) <= 0) { - it.next(); - } - } - } - this.end = end; - - if (params.skip > 0) { - skip(params.skip); - } - } - - @Override - public boolean hasNext() { - if (!checkedNext && !closed) { - next = loadNext(); - checkedNext = true; - } - if (!closed && next == null) { - try { - close(); - } catch (IOException ioe) { - throw Throwables.propagate(ioe); - } - } - return next != null; - } - - @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - checkedNext = false; - - try { - T ret; - if (index == null || index.isCopy()) { - ret = db.serializer.deserialize(next, type); - } else { - byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next); - ret = db.get(key, type); - } - next = null; - return ret; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public List<T> next(int max) { - List<T> list = new ArrayList<>(max); - while (hasNext() && list.size() < max) { - list.add(next()); - } - return list; - } - - @Override - public boolean skip(long n) { - long skipped = 0; - while (skipped < n) { - if (next != null) { - checkedNext = false; - next = null; - skipped++; - continue; - } - - boolean hasNext = ascending ? it.hasNext() : it.hasPrev(); - if (!hasNext) { - checkedNext = true; - return false; - } - - Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev(); - if (!isEndMarker(e.getKey())) { - skipped++; - } - } - - return hasNext(); - } - - @Override - public synchronized void close() throws IOException { - if (!closed) { - it.close(); - closed = true; - } - } - - private byte[] loadNext() { - if (count >= max) { - return null; - } - - try { - while (true) { - boolean hasNext = ascending ? it.hasNext() : it.hasPrev(); - if (!hasNext) { - return null; - } - - Map.Entry<byte[], byte[]> nextEntry; - try { - // Avoid races if another thread is updating the DB. - nextEntry = ascending ? it.next() : it.prev(); - } catch (NoSuchElementException e) { - return null; - } - - byte[] nextKey = nextEntry.getKey(); - // Next key is not part of the index, stop. - if (!startsWith(nextKey, indexKeyPrefix)) { - return null; - } - - // If the next key is an end marker, then skip it. - if (isEndMarker(nextKey)) { - continue; - } - - // If there's a known end key and iteration has gone past it, stop. - if (end != null) { - int comp = compare(nextKey, end) * (ascending ? 1 : -1); - if (comp > 0) { - return null; - } - } - - count++; - - // Next element is part of the iteration, return it. - return nextEntry.getValue(); - } - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @VisibleForTesting - static boolean startsWith(byte[] key, byte[] prefix) { - if (key.length < prefix.length) { - return false; - } - - for (int i = 0; i < prefix.length; i++) { - if (key[i] != prefix[i]) { - return false; - } - } - - return true; - } - - private boolean isEndMarker(byte[] key) { - return (key.length > 2 && - key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR && - key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]); - } - - static int compare(byte[] a, byte[] b) { - int diff = 0; - int minLen = Math.min(a.length, b.length); - for (int i = 0; i < minLen; i++) { - diff += (a[i] - b[i]); - if (diff != 0) { - return diff; - } - } - - return a.length - b.length; - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java deleted file mode 100644 index 722f54e..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.lang.reflect.Array; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.base.Preconditions; -import org.iq80.leveldb.WriteBatch; - -/** - * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected - * via reflection, to make it cheaper to access it multiple times. - * - * <p> - * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures - * that iteration over indices is easy, and that updating values in the store is not overly - * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping - * lists of pointers, which would be more expensive to update at runtime. - * </p> - * - * <p> - * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full - * key would be the concatenation of everything up to that point in the hierarchy, with each - * component separated by a NULL byte. - * </p> - * - * <pre> - * +TYPE_NAME - * NATURAL_INDEX - * +NATURAL_KEY - * - - * -NATURAL_INDEX - * INDEX_NAME - * +INDEX_VALUE - * +NATURAL_KEY - * -INDEX_VALUE - * .INDEX_VALUE - * CHILD_INDEX_NAME - * +CHILD_INDEX_VALUE - * NATURAL_KEY_OR_DATA - * - - * -INDEX_NAME - * </pre> - * - * <p> - * Entity data (either the entity's natural key or a copy of the data) is stored in all keys - * that end with "+<something>". A count of all objects that match a particular top-level index - * value is kept at the end marker ("-<something>"). A count is also kept at the natural index's end - * marker, to make it easy to retrieve the number of all elements of a particular type. - * </p> - * - * <p> - * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd - * have these keys and values in the store for two instances, one with natural key "key1" and the - * other "key2", both with value "yes" for "bar": - * </p> - * - * <pre> - * Foo __main__ +key1 [data for instance 1] - * Foo __main__ +key2 [data for instance 2] - * Foo __main__ - [count of all Foo] - * Foo bar +yes +key1 [instance 1 key or data, depending on index type] - * Foo bar +yes +key2 [instance 2 key or data, depending on index type] - * Foo bar +yes - [count of all Foo with "bar=yes" ] - * </pre> - * - * <p> - * Note that all indexed values are prepended with "+", even if the index itself does not have an - * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB - * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part - * of the full LevelDB key is generally referred to as the "index value" of the entity. - * </p> - * - * <p> - * Child indices are stored after their parent index. In the example above, let's assume there is - * a child index "child", whose parent is "bar". If both instances have value "no" for this field, - * the data in the store would look something like the following: - * </p> - * - * <pre> - * ... - * Foo bar +yes - - * Foo bar .yes .child +no +key1 [instance 1 key or data, depending on index type] - * Foo bar .yes .child +no +key2 [instance 2 key or data, depending on index type] - * ... - * </pre> - */ -class LevelDBTypeInfo { - - static final byte[] END_MARKER = new byte[] { '-' }; - static final byte ENTRY_PREFIX = (byte) '+'; - static final byte KEY_SEPARATOR = 0x0; - static byte TRUE = (byte) '1'; - static byte FALSE = (byte) '0'; - - private static final byte SECONDARY_IDX_PREFIX = (byte) '.'; - private static final byte POSITIVE_MARKER = (byte) '='; - private static final byte NEGATIVE_MARKER = (byte) '*'; - private static final byte[] HEX_BYTES = new byte[] { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' - }; - - private final LevelDB db; - private final Class<?> type; - private final Map<String, Index> indices; - private final byte[] typePrefix; - - LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception { - this.db = db; - this.type = type; - this.indices = new HashMap<>(); - - KVTypeInfo ti = new KVTypeInfo(type); - - // First create the parent indices, then the child indices. - ti.indices().forEach(idx -> { - if (idx.parent().isEmpty()) { - indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null)); - } - }); - ti.indices().forEach(idx -> { - if (!idx.parent().isEmpty()) { - indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), - indices.get(idx.parent()))); - } - }); - - this.typePrefix = alias; - } - - Class<?> type() { - return type; - } - - byte[] keyPrefix() { - return typePrefix; - } - - Index naturalIndex() { - return index(KVIndex.NATURAL_INDEX_NAME); - } - - Index index(String name) { - Index i = indices.get(name); - Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name, - type.getName()); - return i; - } - - Collection<Index> indices() { - return indices.values(); - } - - byte[] buildKey(byte[]... components) { - return buildKey(true, components); - } - - byte[] buildKey(boolean addTypePrefix, byte[]... components) { - int len = 0; - if (addTypePrefix) { - len += typePrefix.length + 1; - } - for (byte[] comp : components) { - len += comp.length; - } - len += components.length - 1; - - byte[] dest = new byte[len]; - int written = 0; - - if (addTypePrefix) { - System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length); - dest[typePrefix.length] = KEY_SEPARATOR; - written += typePrefix.length + 1; - } - - for (byte[] comp : components) { - System.arraycopy(comp, 0, dest, written, comp.length); - written += comp.length; - if (written < dest.length) { - dest[written] = KEY_SEPARATOR; - written++; - } - } - - return dest; - } - - /** - * Models a single index in LevelDB. See top-level class's javadoc for a description of how the - * keys are generated. - */ - class Index { - - private final boolean copy; - private final boolean isNatural; - private final byte[] name; - private final KVTypeInfo.Accessor accessor; - private final Index parent; - - private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) { - byte[] name = self.value().getBytes(UTF_8); - if (parent != null) { - byte[] child = new byte[name.length + 1]; - child[0] = SECONDARY_IDX_PREFIX; - System.arraycopy(name, 0, child, 1, name.length); - } - - this.name = name; - this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME); - this.copy = isNatural || self.copy(); - this.accessor = accessor; - this.parent = parent; - } - - boolean isCopy() { - return copy; - } - - boolean isChild() { - return parent != null; - } - - Index parent() { - return parent; - } - - /** - * Creates a key prefix for child indices of this index. This allows the prefix to be - * calculated only once, avoiding redundant work when multiple child indices of the - * same parent index exist. - */ - byte[] childPrefix(Object value) throws Exception { - Preconditions.checkState(parent == null, "Not a parent index."); - return buildKey(name, toParentKey(value)); - } - - /** - * Gets the index value for a particular entity (which is the value of the field or method - * tagged with the index annotation). This is used as part of the LevelDB key where the - * entity (or its id) is stored. - */ - Object getValue(Object entity) throws Exception { - return accessor.get(entity); - } - - private void checkParent(byte[] prefix) { - if (prefix != null) { - Preconditions.checkState(parent != null, "Parent prefix provided for parent index."); - } else { - Preconditions.checkState(parent == null, "Parent prefix missing for child index."); - } - } - - /** The prefix for all keys that belong to this index. */ - byte[] keyPrefix(byte[] prefix) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name) : buildKey(name); - } - - /** - * The key where to start ascending iteration for entities whose value for the indexed field - * match the given value. - */ - byte[] start(byte[] prefix, Object value) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name, toKey(value)) - : buildKey(name, toKey(value)); - } - - /** The key for the index's end marker. */ - byte[] end(byte[] prefix) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name, END_MARKER) - : buildKey(name, END_MARKER); - } - - /** The key for the end marker for entries with the given value. */ - byte[] end(byte[] prefix, Object value) throws Exception { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER) - : buildKey(name, toKey(value), END_MARKER); - } - - /** The full key in the index that identifies the given entity. */ - byte[] entityKey(byte[] prefix, Object entity) throws Exception { - Object indexValue = getValue(entity); - Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.", - name, type.getName()); - byte[] entityKey = start(prefix, indexValue); - if (!isNatural) { - entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity))); - } - return entityKey; - } - - private void updateCount(WriteBatch batch, byte[] key, long delta) throws Exception { - long updated = getCount(key) + delta; - if (updated > 0) { - batch.put(key, db.serializer.serialize(updated)); - } else { - batch.delete(key); - } - } - - private void addOrRemove( - WriteBatch batch, - Object entity, - Object existing, - byte[] data, - byte[] naturalKey, - byte[] prefix) throws Exception { - Object indexValue = getValue(entity); - Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.", - name, type.getName()); - - byte[] entityKey = start(prefix, indexValue); - if (!isNatural) { - entityKey = buildKey(false, entityKey, naturalKey); - } - - boolean needCountUpdate = (existing == null); - - // Check whether there's a need to update the index. The index needs to be updated in two - // cases: - // - // - There is no existing value for the entity, so a new index value will be added. - // - If there is a previously stored value for the entity, and the index value for the - // current index does not match the new value, the old entry needs to be deleted and - // the new one added. - // - // Natural indices don't need to be checked, because by definition both old and new entities - // will have the same key. The put() call is all that's needed in that case. - // - // Also check whether we need to update the counts. If the indexed value is changing, we - // need to decrement the count at the old index value, and the new indexed value count needs - // to be incremented. - if (existing != null && !isNatural) { - byte[] oldPrefix = null; - Object oldIndexedValue = getValue(existing); - boolean removeExisting = !indexValue.equals(oldIndexedValue); - if (!removeExisting && isChild()) { - oldPrefix = parent().childPrefix(parent().getValue(existing)); - removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0; - } - - if (removeExisting) { - if (oldPrefix == null && isChild()) { - oldPrefix = parent().childPrefix(parent().getValue(existing)); - } - - byte[] oldKey = entityKey(oldPrefix, existing); - batch.delete(oldKey); - - // If the indexed value has changed, we need to update the counts at the old and new - // end markers for the indexed value. - if (!isChild()) { - byte[] oldCountKey = end(null, oldIndexedValue); - updateCount(batch, oldCountKey, -1L); - needCountUpdate = true; - } - } - } - - if (data != null) { - byte[] stored = copy ? data : naturalKey; - batch.put(entityKey, stored); - } else { - batch.delete(entityKey); - } - - if (needCountUpdate && !isChild()) { - long delta = data != null ? 1L : -1L; - byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue); - updateCount(batch, countKey, delta); - } - } - - /** - * Add an entry to the index. - * - * @param batch Write batch with other related changes. - * @param entity The entity being added to the index. - * @param existing The entity being replaced in the index, or null. - * @param data Serialized entity to store (when storing the entity, not a reference). - * @param naturalKey The value's natural key (to avoid re-computing it for every index). - * @param prefix The parent index prefix, if this is a child index. - */ - void add( - WriteBatch batch, - Object entity, - Object existing, - byte[] data, - byte[] naturalKey, - byte[] prefix) throws Exception { - addOrRemove(batch, entity, existing, data, naturalKey, prefix); - } - - /** - * Remove a value from the index. - * - * @param batch Write batch with other related changes. - * @param entity The entity being removed, to identify the index entry to modify. - * @param naturalKey The value's natural key (to avoid re-computing it for every index). - * @param prefix The parent index prefix, if this is a child index. - */ - void remove( - WriteBatch batch, - Object entity, - byte[] naturalKey, - byte[] prefix) throws Exception { - addOrRemove(batch, entity, null, null, naturalKey, prefix); - } - - long getCount(byte[] key) throws Exception { - byte[] data = db.db().get(key); - return data != null ? db.serializer.deserializeLong(data) : 0; - } - - byte[] toParentKey(Object value) { - return toKey(value, SECONDARY_IDX_PREFIX); - } - - byte[] toKey(Object value) { - return toKey(value, ENTRY_PREFIX); - } - - /** - * Translates a value to be used as part of the store key. - * - * Integral numbers are encoded as a string in a way that preserves lexicographical - * ordering. The string is prepended with a marker telling whether the number is negative - * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the - * opposite of the desired order), and then the number is encoded into a hex string (so - * it occupies twice the number of bytes as the original type). - * - * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR. - */ - byte[] toKey(Object value, byte prefix) { - final byte[] result; - - if (value instanceof String) { - byte[] str = ((String) value).getBytes(UTF_8); - result = new byte[str.length + 1]; - result[0] = prefix; - System.arraycopy(str, 0, result, 1, str.length); - } else if (value instanceof Boolean) { - result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE }; - } else if (value.getClass().isArray()) { - int length = Array.getLength(value); - byte[][] components = new byte[length][]; - for (int i = 0; i < length; i++) { - components[i] = toKey(Array.get(value, i)); - } - result = buildKey(false, components); - } else { - int bytes; - - if (value instanceof Integer) { - bytes = Integer.SIZE; - } else if (value instanceof Long) { - bytes = Long.SIZE; - } else if (value instanceof Short) { - bytes = Short.SIZE; - } else if (value instanceof Byte) { - bytes = Byte.SIZE; - } else { - throw new IllegalArgumentException(String.format("Type %s not allowed as key.", - value.getClass().getName())); - } - - bytes = bytes / Byte.SIZE; - - byte[] key = new byte[bytes * 2 + 2]; - long longValue = ((Number) value).longValue(); - key[0] = prefix; - key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER; - - for (int i = 0; i < key.length - 2; i++) { - int masked = (int) ((longValue >>> (4 * i)) & 0xF); - key[key.length - i - 1] = HEX_BYTES[masked]; - } - - result = key; - } - - return result; - } - - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java deleted file mode 100644 index 2ed246e..0000000 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.io.IOException; - -/** - * Exception thrown when the store implementation is not compatible with the underlying data. - */ -public class UnsupportedStoreVersionException extends IOException { - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java new file mode 100644 index 0000000..9bc8c55 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/ArrayWrappers.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.util.Arrays; + +import com.google.common.base.Preconditions; + +/** + * A factory for array wrappers so that arrays can be used as keys in a map, sorted or not. + * + * The comparator implementation makes two assumptions: + * - All elements are instances of Comparable + * - When comparing two arrays, they both contain elements of the same type in corresponding + * indices. + * + * Otherwise, ClassCastExceptions may occur. The equality method can compare any two arrays. + * + * This class is not efficient and is mostly meant to compare really small arrays, like those + * generally used as indices and keys in a KVStore. + */ +class ArrayWrappers { + + @SuppressWarnings("unchecked") + public static Comparable<Object> forArray(Object a) { + Preconditions.checkArgument(a.getClass().isArray()); + Comparable<?> ret; + if (a instanceof int[]) { + ret = new ComparableIntArray((int[]) a); + } else if (a instanceof long[]) { + ret = new ComparableLongArray((long[]) a); + } else if (a instanceof byte[]) { + ret = new ComparableByteArray((byte[]) a); + } else { + Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive()); + ret = new ComparableObjectArray((Object[]) a); + } + return (Comparable<Object>) ret; + } + + private static class ComparableIntArray implements Comparable<ComparableIntArray> { + + private final int[] array; + + ComparableIntArray(int[] array) { + this.array = array; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ComparableIntArray)) { + return false; + } + return Arrays.equals(array, ((ComparableIntArray) other).array); + } + + @Override + public int hashCode() { + int code = 0; + for (int i = 0; i < array.length; i++) { + code = (code * 31) + array[i]; + } + return code; + } + + @Override + public int compareTo(ComparableIntArray other) { + int len = Math.min(array.length, other.array.length); + for (int i = 0; i < len; i++) { + int diff = array[i] - other.array[i]; + if (diff != 0) { + return diff; + } + } + + return array.length - other.array.length; + } + } + + private static class ComparableLongArray implements Comparable<ComparableLongArray> { + + private final long[] array; + + ComparableLongArray(long[] array) { + this.array = array; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ComparableLongArray)) { + return false; + } + return Arrays.equals(array, ((ComparableLongArray) other).array); + } + + @Override + public int hashCode() { + int code = 0; + for (int i = 0; i < array.length; i++) { + code = (code * 31) + (int) array[i]; + } + return code; + } + + @Override + public int compareTo(ComparableLongArray other) { + int len = Math.min(array.length, other.array.length); + for (int i = 0; i < len; i++) { + long diff = array[i] - other.array[i]; + if (diff != 0) { + return diff > 0 ? 1 : -1; + } + } + + return array.length - other.array.length; + } + } + + private static class ComparableByteArray implements Comparable<ComparableByteArray> { + + private final byte[] array; + + ComparableByteArray(byte[] array) { + this.array = array; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ComparableByteArray)) { + return false; + } + return Arrays.equals(array, ((ComparableByteArray) other).array); + } + + @Override + public int hashCode() { + int code = 0; + for (int i = 0; i < array.length; i++) { + code = (code * 31) + array[i]; + } + return code; + } + + @Override + public int compareTo(ComparableByteArray other) { + int len = Math.min(array.length, other.array.length); + for (int i = 0; i < len; i++) { + int diff = array[i] - other.array[i]; + if (diff != 0) { + return diff; + } + } + + return array.length - other.array.length; + } + } + + private static class ComparableObjectArray implements Comparable<ComparableObjectArray> { + + private final Object[] array; + + ComparableObjectArray(Object[] array) { + this.array = array; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ComparableObjectArray)) { + return false; + } + return Arrays.equals(array, ((ComparableObjectArray) other).array); + } + + @Override + public int hashCode() { + int code = 0; + for (int i = 0; i < array.length; i++) { + code = (code * 31) + array[i].hashCode(); + } + return code; + } + + @Override + @SuppressWarnings("unchecked") + public int compareTo(ComparableObjectArray other) { + int len = Math.min(array.length, other.array.length); + for (int i = 0; i < len; i++) { + int diff = ((Comparable<Object>) array[i]).compareTo((Comparable<Object>) other.array[i]); + if (diff != 0) { + return diff; + } + } + + return array.length - other.array.length; + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org