http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java new file mode 100644 index 0000000..9cae5da --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import org.apache.spark.annotation.Private; + +/** + * Implementation of KVStore that keeps data deserialized in memory. This store does not index + * data; instead, whenever iterating over an indexed field, the stored data is copied and sorted + * according to the index. This saves memory but makes iteration more expensive. + */ +@Private +public class InMemoryStore implements KVStore { + + private Object metadata; + private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>(); + + @Override + public <T> T getMetadata(Class<T> klass) { + return klass.cast(metadata); + } + + @Override + public void setMetadata(Object value) { + this.metadata = value; + } + + @Override + public long count(Class<?> type) { + InstanceList list = data.get(type); + return list != null ? list.size() : 0; + } + + @Override + public long count(Class<?> type, String index, Object indexedValue) throws Exception { + InstanceList list = data.get(type); + int count = 0; + Object comparable = asKey(indexedValue); + KVTypeInfo.Accessor accessor = list.getIndexAccessor(index); + for (Object o : view(type)) { + if (Objects.equal(comparable, asKey(accessor.get(o)))) { + count++; + } + } + return count; + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) { + InstanceList list = data.get(klass); + Object value = list != null ? list.get(naturalKey) : null; + if (value == null) { + throw new NoSuchElementException(); + } + return klass.cast(value); + } + + @Override + public void write(Object value) throws Exception { + InstanceList list = data.computeIfAbsent(value.getClass(), key -> { + try { + return new InstanceList(key); + } catch (Exception e) { + throw Throwables.propagate(e); + } + }); + list.put(value); + } + + @Override + public void delete(Class<?> type, Object naturalKey) { + InstanceList list = data.get(type); + if (list != null) { + list.delete(naturalKey); + } + } + + @Override + public <T> KVStoreView<T> view(Class<T> type){ + InstanceList list = data.get(type); + return list != null ? list.view(type) + : new InMemoryView<>(type, Collections.<T>emptyList(), null); + } + + @Override + public void close() { + metadata = null; + data.clear(); + } + + @SuppressWarnings("unchecked") + private static Comparable<Object> asKey(Object in) { + if (in.getClass().isArray()) { + in = ArrayWrappers.forArray(in); + } + return (Comparable<Object>) in; + } + + private static class InstanceList { + + private final KVTypeInfo ti; + private final KVTypeInfo.Accessor naturalKey; + private final ConcurrentMap<Comparable<Object>, Object> data; + + private int size; + + private InstanceList(Class<?> type) throws Exception { + this.ti = new KVTypeInfo(type); + this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); + this.data = new ConcurrentHashMap<>(); + this.size = 0; + } + + KVTypeInfo.Accessor getIndexAccessor(String indexName) { + return ti.getAccessor(indexName); + } + + public Object get(Object key) { + return data.get(asKey(key)); + } + + public void put(Object value) throws Exception { + Preconditions.checkArgument(ti.type().equals(value.getClass()), + "Unexpected type: %s", value.getClass()); + if (data.put(asKey(naturalKey.get(value)), value) == null) { + size++; + } + } + + public void delete(Object key) { + if (data.remove(asKey(key)) != null) { + size--; + } + } + + public int size() { + return size; + } + + @SuppressWarnings("unchecked") + public <T> InMemoryView<T> view(Class<T> type) { + Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); + Collection<T> all = (Collection<T>) data.values(); + return new InMemoryView(type, all, ti); + } + + } + + private static class InMemoryView<T> extends KVStoreView<T> { + + private final Collection<T> elements; + private final KVTypeInfo ti; + private final KVTypeInfo.Accessor natural; + + InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) { + super(type); + this.elements = elements; + this.ti = ti; + this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null; + } + + @Override + public Iterator<T> iterator() { + if (elements.isEmpty()) { + return new InMemoryIterator<>(elements.iterator()); + } + + try { + KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null; + int modifier = ascending ? 1 : -1; + + final List<T> sorted = copyElements(); + Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter)); + Stream<T> stream = sorted.stream(); + + if (first != null) { + stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0); + } + + if (last != null) { + stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0); + } + + if (skip > 0) { + stream = stream.skip(skip); + } + + if (max < sorted.size()) { + stream = stream.limit((int) max); + } + + return new InMemoryIterator<>(stream.iterator()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * Create a copy of the input elements, filtering the values for child indices if needed. + */ + private List<T> copyElements() { + if (parent != null) { + KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); + Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); + + return elements.stream() + .filter(e -> compare(e, parentGetter, parent) == 0) + .collect(Collectors.toList()); + } else { + return new ArrayList<>(elements); + } + } + + private int compare(T e1, T e2, KVTypeInfo.Accessor getter) { + try { + int diff = compare(e1, getter, getter.get(e2)); + if (diff == 0 && getter != natural) { + diff = compare(e1, natural, natural.get(e2)); + } + return diff; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) { + try { + return asKey(getter.get(e1)).compareTo(asKey(v2)); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + } + + private static class InMemoryIterator<T> implements KVStoreIterator<T> { + + private final Iterator<T> iter; + + InMemoryIterator(Iterator<T> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public T next() { + return iter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public List<T> next(int max) { + List<T> list = new ArrayList<>(max); + while (hasNext() && list.size() < max) { + list.add(next()); + } + return list; + } + + @Override + public boolean skip(long n) { + long skipped = 0; + while (skipped < n) { + if (hasNext()) { + next(); + skipped++; + } else { + return false; + } + } + + return hasNext(); + } + + @Override + public void close() { + // no op. + } + + } + +}
http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java new file mode 100644 index 0000000..80f4921 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVIndex.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.apache.spark.annotation.Private; + +/** + * Tags a field to be indexed when storing an object. + * + * <p> + * Types are required to have a natural index that uniquely identifies instances in the store. + * The default value of the annotation identifies the natural index for the type. + * </p> + * + * <p> + * Indexes allow for more efficient sorting of data read from the store. By annotating a field or + * "getter" method with this annotation, an index will be created that will provide sorting based on + * the string value of that field. + * </p> + * + * <p> + * Note that creating indices means more space will be needed, and maintenance operations like + * updating or deleting a value will become more expensive. + * </p> + * + * <p> + * Indices are restricted to String, integral types (byte, short, int, long, boolean), and arrays + * of those values. + * </p> + */ +@Private +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD}) +public @interface KVIndex { + + String NATURAL_INDEX_NAME = "__main__"; + + /** + * The name of the index to be created for the annotated entity. Must be unique within + * the class. Index names are not allowed to start with an underscore (that's reserved for + * internal use). The default value is the natural index name (which is always a copy index + * regardless of the annotation's values). + */ + String value() default NATURAL_INDEX_NAME; + + /** + * The name of the parent index of this index. By default there is no parent index, so the + * generated data can be retrieved without having to provide a parent value. + * + * <p> + * If a parent index is defined, iterating over the data using the index will require providing + * a single value for the parent index. This serves as a rudimentary way to provide relationships + * between entities in the store. + * </p> + */ + String parent() default ""; + + /** + * Whether to copy the instance's data to the index, instead of just storing a pointer to the + * data. The default behavior is to just store a reference; that saves disk space but is slower + * to read, since there's a level of indirection. + */ + boolean copy() default false; + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java new file mode 100644 index 0000000..72d06a8 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.io.Closeable; + +import org.apache.spark.annotation.Private; + +/** + * Abstraction for a local key/value store for storing app data. + * + * <p> + * There are two main features provided by the implementations of this interface: + * </p> + * + * <h3>Serialization</h3> + * + * <p> + * If the underlying data store requires serialization, data will be serialized to and deserialized + * using a {@link KVStoreSerializer}, which can be customized by the application. The serializer is + * based on Jackson, so it supports all the Jackson annotations for controlling the serialization of + * app-defined types. + * </p> + * + * <p> + * Data is also automatically compressed to save disk space. + * </p> + * + * <h3>Automatic Key Management</h3> + * + * <p> + * When using the built-in key management, the implementation will automatically create unique + * keys for each type written to the store. Keys are based on the type name, and always start + * with the "+" prefix character (so that it's easy to use both manual and automatic key + * management APIs without conflicts). + * </p> + * + * <p> + * Another feature of automatic key management is indexing; by annotating fields or methods of + * objects written to the store with {@link KVIndex}, indices are created to sort the data + * by the values of those properties. This makes it possible to provide sorting without having + * to load all instances of those types from the store. + * </p> + * + * <p> + * KVStore instances are thread-safe for both reads and writes. + * </p> + */ +@Private +public interface KVStore extends Closeable { + + /** + * Returns app-specific metadata from the store, or null if it's not currently set. + * + * <p> + * The metadata type is application-specific. This is a convenience method so that applications + * don't need to define their own keys for this information. + * </p> + */ + <T> T getMetadata(Class<T> klass) throws Exception; + + /** + * Writes the given value in the store metadata key. + */ + void setMetadata(Object value) throws Exception; + + /** + * Read a specific instance of an object. + * + * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys + * are not allowed. + * @throws java.util.NoSuchElementException If an element with the given key does not exist. + */ + <T> T read(Class<T> klass, Object naturalKey) throws Exception; + + /** + * Writes the given object to the store, including indexed fields. Indices are updated based + * on the annotated fields of the object's class. + * + * <p> + * Writes may be slower when the object already exists in the store, since it will involve + * updating existing indices. + * </p> + * + * @param value The object to write. + */ + void write(Object value) throws Exception; + + /** + * Removes an object and all data related to it, like index entries, from the store. + * + * @param type The object's type. + * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys + * are not allowed. + * @throws java.util.NoSuchElementException If an element with the given key does not exist. + */ + void delete(Class<?> type, Object naturalKey) throws Exception; + + /** + * Returns a configurable view for iterating over entities of the given type. + */ + <T> KVStoreView<T> view(Class<T> type) throws Exception; + + /** + * Returns the number of items of the given type currently in the store. + */ + long count(Class<?> type) throws Exception; + + /** + * Returns the number of items of the given type which match the given indexed value. + */ + long count(Class<?> type, String index, Object indexedValue) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java new file mode 100644 index 0000000..28a432b --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreIterator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.util.Iterator; +import java.util.List; + +import org.apache.spark.annotation.Private; + +/** + * An iterator for KVStore. + * + * <p> + * Iterators may keep references to resources that need to be closed. It's recommended that users + * explicitly close iterators after they're used. + * </p> + */ +@Private +public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable { + + /** + * Retrieve multiple elements from the store. + * + * @param max Maximum number of elements to retrieve. + */ + List<T> next(int max); + + /** + * Skip in the iterator. + * + * @return Whether there are items left after skipping. + */ + boolean skip(long n); + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java new file mode 100644 index 0000000..bd8d948 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.spark.annotation.Private; + +/** + * Serializer used to translate between app-defined types and the LevelDB store. + * + * <p> + * The serializer is based on Jackson, so values are written as JSON. It also allows "naked strings" + * and integers to be written as values directly, which will be written as UTF-8 strings. + * </p> + */ +@Private +public class KVStoreSerializer { + + /** + * Object mapper used to process app-specific types. If an application requires a specific + * configuration of the mapper, it can subclass this serializer and add custom configuration + * to this object. + */ + protected final ObjectMapper mapper; + + public KVStoreSerializer() { + this.mapper = new ObjectMapper(); + } + + public final byte[] serialize(Object o) throws Exception { + if (o instanceof String) { + return ((String) o).getBytes(UTF_8); + } else { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + GZIPOutputStream out = new GZIPOutputStream(bytes); + try { + mapper.writeValue(out, o); + } finally { + out.close(); + } + return bytes.toByteArray(); + } + } + + @SuppressWarnings("unchecked") + public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception { + if (klass.equals(String.class)) { + return (T) new String(data, UTF_8); + } else { + GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data)); + try { + return mapper.readValue(in, klass); + } finally { + in.close(); + } + } + } + + final byte[] serialize(long value) { + return String.valueOf(value).getBytes(UTF_8); + } + + final long deserializeLong(byte[] data) { + return Long.parseLong(new String(data, UTF_8)); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java new file mode 100644 index 0000000..8ea79bb --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreView.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import com.google.common.base.Preconditions; + +import org.apache.spark.annotation.Private; + +/** + * A configurable view that allows iterating over values in a {@link KVStore}. + * + * <p> + * The different methods can be used to configure the behavior of the iterator. Calling the same + * method multiple times is allowed; the most recent value will be used. + * </p> + * + * <p> + * The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close + * when used in a for loop that exhausts their contents, but when used manually, they need + * to be closed explicitly unless all elements are read. + * </p> + */ +@Private +public abstract class KVStoreView<T> implements Iterable<T> { + + final Class<T> type; + + boolean ascending = true; + String index = KVIndex.NATURAL_INDEX_NAME; + Object first = null; + Object last = null; + Object parent = null; + long skip = 0L; + long max = Long.MAX_VALUE; + + public KVStoreView(Class<T> type) { + this.type = type; + } + + /** + * Reverses the order of iteration. By default, iterates in ascending order. + */ + public KVStoreView<T> reverse() { + ascending = !ascending; + return this; + } + + /** + * Iterates according to the given index. + */ + public KVStoreView<T> index(String name) { + this.index = Preconditions.checkNotNull(name); + return this; + } + + /** + * Defines the value of the parent index when iterating over a child index. Only elements that + * match the parent index's value will be included in the iteration. + * + * <p> + * Required for iterating over child indices, will generate an error if iterating over a + * parent-less index. + * </p> + */ + public KVStoreView<T> parent(Object value) { + this.parent = value; + return this; + } + + /** + * Iterates starting at the given value of the chosen index (inclusive). + */ + public KVStoreView<T> first(Object value) { + this.first = value; + return this; + } + + /** + * Stops iteration at the given value of the chosen index (inclusive). + */ + public KVStoreView<T> last(Object value) { + this.last = value; + return this; + } + + /** + * Stops iteration after a number of elements has been retrieved. + */ + public KVStoreView<T> max(long max) { + Preconditions.checkArgument(max > 0L, "max must be positive."); + this.max = max; + return this; + } + + /** + * Skips a number of elements at the start of iteration. Skipped elements are not accounted + * when using {@link #max(long)}. + */ + public KVStoreView<T> skip(long n) { + this.skip = n; + return this; + } + + /** + * Returns an iterator for the current configuration. + */ + public KVStoreIterator<T> closeableIterator() throws Exception { + return (KVStoreIterator<T>) iterator(); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java new file mode 100644 index 0000000..a2b077e --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; + +import org.apache.spark.annotation.Private; + +/** + * Wrapper around types managed in a KVStore, providing easy access to their indexed fields. + */ +@Private +public class KVTypeInfo { + + private final Class<?> type; + private final Map<String, KVIndex> indices; + private final Map<String, Accessor> accessors; + + public KVTypeInfo(Class<?> type) throws Exception { + this.type = type; + this.accessors = new HashMap<>(); + this.indices = new HashMap<>(); + + for (Field f : type.getDeclaredFields()) { + KVIndex idx = f.getAnnotation(KVIndex.class); + if (idx != null) { + checkIndex(idx, indices); + indices.put(idx.value(), idx); + f.setAccessible(true); + accessors.put(idx.value(), new FieldAccessor(f)); + } + } + + for (Method m : type.getDeclaredMethods()) { + KVIndex idx = m.getAnnotation(KVIndex.class); + if (idx != null) { + checkIndex(idx, indices); + Preconditions.checkArgument(m.getParameterTypes().length == 0, + "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); + indices.put(idx.value(), idx); + m.setAccessible(true); + accessors.put(idx.value(), new MethodAccessor(m)); + } + } + + Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME), + "No natural index defined for type %s.", type.getName()); + Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(), + "Natural index of %s cannot have a parent.", type.getName()); + + for (KVIndex idx : indices.values()) { + if (!idx.parent().isEmpty()) { + KVIndex parent = indices.get(idx.parent()); + Preconditions.checkArgument(parent != null, + "Cannot find parent %s of index %s.", idx.parent(), idx.value()); + Preconditions.checkArgument(parent.parent().isEmpty(), + "Parent index %s of index %s cannot be itself a child index.", idx.parent(), idx.value()); + } + } + } + + private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) { + Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(), + "No name provided for index in type %s.", type.getName()); + Preconditions.checkArgument( + !idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME), + "Index name %s (in type %s) is not allowed.", idx.value(), type.getName()); + Preconditions.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()), + "Index %s cannot be parent of itself.", idx.value()); + Preconditions.checkArgument(!indices.containsKey(idx.value()), + "Duplicate index %s for type %s.", idx.value(), type.getName()); + } + + public Class<?> type() { + return type; + } + + public Object getIndexValue(String indexName, Object instance) throws Exception { + return getAccessor(indexName).get(instance); + } + + public Stream<KVIndex> indices() { + return indices.values().stream(); + } + + Accessor getAccessor(String indexName) { + Accessor a = accessors.get(indexName); + Preconditions.checkArgument(a != null, "No index %s.", indexName); + return a; + } + + Accessor getParentAccessor(String indexName) { + KVIndex index = indices.get(indexName); + return index.parent().isEmpty() ? null : getAccessor(index.parent()); + } + + /** + * Abstracts the difference between invoking a Field and a Method. + */ + interface Accessor { + + Object get(Object instance) throws Exception; + + } + + private class FieldAccessor implements Accessor { + + private final Field field; + + FieldAccessor(Field field) { + this.field = field; + } + + @Override + public Object get(Object instance) throws Exception { + return field.get(instance); + } + + } + + private class MethodAccessor implements Accessor { + + private final Method method; + + MethodAccessor(Method method) { + this.method = method; + } + + @Override + public Object get(Object instance) throws Exception { + return method.invoke(instance); + } + + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java new file mode 100644 index 0000000..310febc --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +import org.apache.spark.annotation.Private; + +/** + * Implementation of KVStore that uses LevelDB as the underlying data store. + */ +@Private +public class LevelDB implements KVStore { + + @VisibleForTesting + static final long STORE_VERSION = 1L; + + @VisibleForTesting + static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8); + + /** DB key where app metadata is stored. */ + private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8); + + /** DB key where type aliases are stored. */ + private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8); + + final AtomicReference<DB> _db; + final KVStoreSerializer serializer; + + /** + * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two + * purposes: make the keys stored on disk shorter, and spread out the keys, since class names + * will often have a long, redundant prefix (think "org.apache.spark."). + */ + private final ConcurrentMap<String, byte[]> typeAliases; + private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types; + + public LevelDB(File path) throws Exception { + this(path, new KVStoreSerializer()); + } + + public LevelDB(File path, KVStoreSerializer serializer) throws Exception { + this.serializer = serializer; + this.types = new ConcurrentHashMap<>(); + + Options options = new Options(); + options.createIfMissing(!path.exists()); + this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); + + byte[] versionData = db().get(STORE_VERSION_KEY); + if (versionData != null) { + long version = serializer.deserializeLong(versionData); + if (version != STORE_VERSION) { + throw new UnsupportedStoreVersionException(); + } + } else { + db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION)); + } + + Map<String, byte[]> aliases; + try { + aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases; + } catch (NoSuchElementException e) { + aliases = new HashMap<>(); + } + typeAliases = new ConcurrentHashMap<>(aliases); + } + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + try { + return get(METADATA_KEY, klass); + } catch (NoSuchElementException nsee) { + return null; + } + } + + @Override + public void setMetadata(Object value) throws Exception { + if (value != null) { + put(METADATA_KEY, value); + } else { + db().delete(METADATA_KEY); + } + } + + <T> T get(byte[] key, Class<T> klass) throws Exception { + byte[] data = db().get(key); + if (data == null) { + throw new NoSuchElementException(new String(key, UTF_8)); + } + return serializer.deserialize(data, klass); + } + + private void put(byte[] key, Object value) throws Exception { + Preconditions.checkArgument(value != null, "Null values are not allowed."); + db().put(key, serializer.serialize(value)); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); + byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey); + return get(key, klass); + } + + @Override + public void write(Object value) throws Exception { + Preconditions.checkArgument(value != null, "Null values are not allowed."); + LevelDBTypeInfo ti = getTypeInfo(value.getClass()); + + try (WriteBatch batch = db().createWriteBatch()) { + byte[] data = serializer.serialize(value); + synchronized (ti) { + Object existing; + try { + existing = get(ti.naturalIndex().entityKey(null, value), value.getClass()); + } catch (NoSuchElementException e) { + existing = null; + } + + PrefixCache cache = new PrefixCache(value); + byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value)); + for (LevelDBTypeInfo.Index idx : ti.indices()) { + byte[] prefix = cache.getPrefix(idx); + idx.add(batch, value, existing, data, naturalKey, prefix); + } + db().write(batch); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); + try (WriteBatch batch = db().createWriteBatch()) { + LevelDBTypeInfo ti = getTypeInfo(type); + byte[] key = ti.naturalIndex().start(null, naturalKey); + synchronized (ti) { + byte[] data = db().get(key); + if (data != null) { + Object existing = serializer.deserialize(data, type); + PrefixCache cache = new PrefixCache(existing); + byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing)); + for (LevelDBTypeInfo.Index idx : ti.indices()) { + idx.remove(batch, existing, keyBytes, cache.getPrefix(idx)); + } + db().write(batch); + } + } + } catch (NoSuchElementException nse) { + // Ignore. + } + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + return new KVStoreView<T>(type) { + @Override + public Iterator<T> iterator() { + try { + return new LevelDBIterator<>(LevelDB.this, this); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; + } + + @Override + public long count(Class<?> type) throws Exception { + LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex(); + return idx.getCount(idx.end(null)); + } + + @Override + public long count(Class<?> type, String index, Object indexedValue) throws Exception { + LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index); + return idx.getCount(idx.end(null, indexedValue)); + } + + @Override + public void close() throws IOException { + DB _db = this._db.getAndSet(null); + if (_db == null) { + return; + } + + try { + _db.close(); + } catch (IOException ioe) { + throw ioe; + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + } + + /** Returns metadata about indices for the given type. */ + LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception { + LevelDBTypeInfo ti = types.get(type); + if (ti == null) { + LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type)); + ti = types.putIfAbsent(type, tmp); + if (ti == null) { + ti = tmp; + } + } + return ti; + } + + /** + * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't + * prevent methods that retrieved the instance from using it after close, but hopefully will + * catch most cases; otherwise, we'll need some kind of locking. + */ + DB db() { + DB _db = this._db.get(); + if (_db == null) { + throw new IllegalStateException("DB is closed."); + } + return _db; + } + + private byte[] getTypeAlias(Class<?> klass) throws Exception { + byte[] alias = typeAliases.get(klass.getName()); + if (alias == null) { + synchronized (typeAliases) { + byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8); + alias = typeAliases.putIfAbsent(klass.getName(), tmp); + if (alias == null) { + alias = tmp; + put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases)); + } + } + } + return alias; + } + + /** Needs to be public for Jackson. */ + public static class TypeAliases { + + public Map<String, byte[]> aliases; + + TypeAliases(Map<String, byte[]> aliases) { + this.aliases = aliases; + } + + TypeAliases() { + this(null); + } + + } + + private static class PrefixCache { + + private final Object entity; + private final Map<LevelDBTypeInfo.Index, byte[]> prefixes; + + PrefixCache(Object entity) { + this.entity = entity; + this.prefixes = new HashMap<>(); + } + + byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception { + byte[] prefix = null; + if (idx.isChild()) { + prefix = prefixes.get(idx.parent()); + if (prefix == null) { + prefix = idx.parent().childPrefix(idx.parent().getValue(entity)); + prefixes.put(idx.parent(), prefix); + } + } + return prefix; + } + + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java new file mode 100644 index 0000000..a2181f3 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.iq80.leveldb.DBIterator; + +class LevelDBIterator<T> implements KVStoreIterator<T> { + + private final LevelDB db; + private final boolean ascending; + private final DBIterator it; + private final Class<T> type; + private final LevelDBTypeInfo ti; + private final LevelDBTypeInfo.Index index; + private final byte[] indexKeyPrefix; + private final byte[] end; + private final long max; + + private boolean checkedNext; + private byte[] next; + private boolean closed; + private long count; + + LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception { + this.db = db; + this.ascending = params.ascending; + this.it = db.db().iterator(); + this.type = params.type; + this.ti = db.getTypeInfo(type); + this.index = ti.index(params.index); + this.max = params.max; + + Preconditions.checkArgument(!index.isChild() || params.parent != null, + "Cannot iterate over child index %s without parent value.", params.index); + byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null; + + this.indexKeyPrefix = index.keyPrefix(parent); + + byte[] firstKey; + if (params.first != null) { + if (ascending) { + firstKey = index.start(parent, params.first); + } else { + firstKey = index.end(parent, params.first); + } + } else if (ascending) { + firstKey = index.keyPrefix(parent); + } else { + firstKey = index.end(parent); + } + it.seek(firstKey); + + byte[] end = null; + if (ascending) { + if (params.last != null) { + end = index.end(parent, params.last); + } else { + end = index.end(parent); + } + } else { + if (params.last != null) { + end = index.start(parent, params.last); + } + if (it.hasNext()) { + // When descending, the caller may have set up the start of iteration at a non-existant + // entry that is guaranteed to be after the desired entry. For example, if you have a + // compound key (a, b) where b is a, integer, you may seek to the end of the elements that + // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not + // exist in the database. So need to check here whether the next value actually belongs to + // the set being returned by the iterator before advancing. + byte[] nextKey = it.peekNext().getKey(); + if (compare(nextKey, indexKeyPrefix) <= 0) { + it.next(); + } + } + } + this.end = end; + + if (params.skip > 0) { + skip(params.skip); + } + } + + @Override + public boolean hasNext() { + if (!checkedNext && !closed) { + next = loadNext(); + checkedNext = true; + } + if (!closed && next == null) { + try { + close(); + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } + } + return next != null; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + checkedNext = false; + + try { + T ret; + if (index == null || index.isCopy()) { + ret = db.serializer.deserialize(next, type); + } else { + byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next); + ret = db.get(key, type); + } + next = null; + return ret; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public List<T> next(int max) { + List<T> list = new ArrayList<>(max); + while (hasNext() && list.size() < max) { + list.add(next()); + } + return list; + } + + @Override + public boolean skip(long n) { + long skipped = 0; + while (skipped < n) { + if (next != null) { + checkedNext = false; + next = null; + skipped++; + continue; + } + + boolean hasNext = ascending ? it.hasNext() : it.hasPrev(); + if (!hasNext) { + checkedNext = true; + return false; + } + + Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev(); + if (!isEndMarker(e.getKey())) { + skipped++; + } + } + + return hasNext(); + } + + @Override + public synchronized void close() throws IOException { + if (!closed) { + it.close(); + closed = true; + } + } + + private byte[] loadNext() { + if (count >= max) { + return null; + } + + try { + while (true) { + boolean hasNext = ascending ? it.hasNext() : it.hasPrev(); + if (!hasNext) { + return null; + } + + Map.Entry<byte[], byte[]> nextEntry; + try { + // Avoid races if another thread is updating the DB. + nextEntry = ascending ? it.next() : it.prev(); + } catch (NoSuchElementException e) { + return null; + } + + byte[] nextKey = nextEntry.getKey(); + // Next key is not part of the index, stop. + if (!startsWith(nextKey, indexKeyPrefix)) { + return null; + } + + // If the next key is an end marker, then skip it. + if (isEndMarker(nextKey)) { + continue; + } + + // If there's a known end key and iteration has gone past it, stop. + if (end != null) { + int comp = compare(nextKey, end) * (ascending ? 1 : -1); + if (comp > 0) { + return null; + } + } + + count++; + + // Next element is part of the iteration, return it. + return nextEntry.getValue(); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @VisibleForTesting + static boolean startsWith(byte[] key, byte[] prefix) { + if (key.length < prefix.length) { + return false; + } + + for (int i = 0; i < prefix.length; i++) { + if (key[i] != prefix[i]) { + return false; + } + } + + return true; + } + + private boolean isEndMarker(byte[] key) { + return (key.length > 2 && + key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR && + key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]); + } + + static int compare(byte[] a, byte[] b) { + int diff = 0; + int minLen = Math.min(a.length, b.length); + for (int i = 0; i < minLen; i++) { + diff += (a[i] - b[i]); + if (diff != 0) { + return diff; + } + } + + return a.length - b.length; + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java new file mode 100644 index 0000000..93aa0bb --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import org.iq80.leveldb.WriteBatch; + +/** + * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected + * via reflection, to make it cheaper to access it multiple times. + * + * <p> + * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures + * that iteration over indices is easy, and that updating values in the store is not overly + * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping + * lists of pointers, which would be more expensive to update at runtime. + * </p> + * + * <p> + * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full + * key would be the concatenation of everything up to that point in the hierarchy, with each + * component separated by a NULL byte. + * </p> + * + * <pre> + * +TYPE_NAME + * NATURAL_INDEX + * +NATURAL_KEY + * - + * -NATURAL_INDEX + * INDEX_NAME + * +INDEX_VALUE + * +NATURAL_KEY + * -INDEX_VALUE + * .INDEX_VALUE + * CHILD_INDEX_NAME + * +CHILD_INDEX_VALUE + * NATURAL_KEY_OR_DATA + * - + * -INDEX_NAME + * </pre> + * + * <p> + * Entity data (either the entity's natural key or a copy of the data) is stored in all keys + * that end with "+<something>". A count of all objects that match a particular top-level index + * value is kept at the end marker ("-<something>"). A count is also kept at the natural index's end + * marker, to make it easy to retrieve the number of all elements of a particular type. + * </p> + * + * <p> + * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd + * have these keys and values in the store for two instances, one with natural key "key1" and the + * other "key2", both with value "yes" for "bar": + * </p> + * + * <pre> + * Foo __main__ +key1 [data for instance 1] + * Foo __main__ +key2 [data for instance 2] + * Foo __main__ - [count of all Foo] + * Foo bar +yes +key1 [instance 1 key or data, depending on index type] + * Foo bar +yes +key2 [instance 2 key or data, depending on index type] + * Foo bar +yes - [count of all Foo with "bar=yes" ] + * </pre> + * + * <p> + * Note that all indexed values are prepended with "+", even if the index itself does not have an + * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB + * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part + * of the full LevelDB key is generally referred to as the "index value" of the entity. + * </p> + * + * <p> + * Child indices are stored after their parent index. In the example above, let's assume there is + * a child index "child", whose parent is "bar". If both instances have value "no" for this field, + * the data in the store would look something like the following: + * </p> + * + * <pre> + * ... + * Foo bar +yes - + * Foo bar .yes .child +no +key1 [instance 1 key or data, depending on index type] + * Foo bar .yes .child +no +key2 [instance 2 key or data, depending on index type] + * ... + * </pre> + */ +class LevelDBTypeInfo { + + static final byte[] END_MARKER = new byte[] { '-' }; + static final byte ENTRY_PREFIX = (byte) '+'; + static final byte KEY_SEPARATOR = 0x0; + static byte TRUE = (byte) '1'; + static byte FALSE = (byte) '0'; + + private static final byte SECONDARY_IDX_PREFIX = (byte) '.'; + private static final byte POSITIVE_MARKER = (byte) '='; + private static final byte NEGATIVE_MARKER = (byte) '*'; + private static final byte[] HEX_BYTES = new byte[] { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' + }; + + private final LevelDB db; + private final Class<?> type; + private final Map<String, Index> indices; + private final byte[] typePrefix; + + LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception { + this.db = db; + this.type = type; + this.indices = new HashMap<>(); + + KVTypeInfo ti = new KVTypeInfo(type); + + // First create the parent indices, then the child indices. + ti.indices().forEach(idx -> { + if (idx.parent().isEmpty()) { + indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null)); + } + }); + ti.indices().forEach(idx -> { + if (!idx.parent().isEmpty()) { + indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), + indices.get(idx.parent()))); + } + }); + + this.typePrefix = alias; + } + + Class<?> type() { + return type; + } + + byte[] keyPrefix() { + return typePrefix; + } + + Index naturalIndex() { + return index(KVIndex.NATURAL_INDEX_NAME); + } + + Index index(String name) { + Index i = indices.get(name); + Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name, + type.getName()); + return i; + } + + Collection<Index> indices() { + return indices.values(); + } + + byte[] buildKey(byte[]... components) { + return buildKey(true, components); + } + + byte[] buildKey(boolean addTypePrefix, byte[]... components) { + int len = 0; + if (addTypePrefix) { + len += typePrefix.length + 1; + } + for (byte[] comp : components) { + len += comp.length; + } + len += components.length - 1; + + byte[] dest = new byte[len]; + int written = 0; + + if (addTypePrefix) { + System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length); + dest[typePrefix.length] = KEY_SEPARATOR; + written += typePrefix.length + 1; + } + + for (byte[] comp : components) { + System.arraycopy(comp, 0, dest, written, comp.length); + written += comp.length; + if (written < dest.length) { + dest[written] = KEY_SEPARATOR; + written++; + } + } + + return dest; + } + + /** + * Models a single index in LevelDB. See top-level class's javadoc for a description of how the + * keys are generated. + */ + class Index { + + private final boolean copy; + private final boolean isNatural; + private final byte[] name; + private final KVTypeInfo.Accessor accessor; + private final Index parent; + + private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) { + byte[] name = self.value().getBytes(UTF_8); + if (parent != null) { + byte[] child = new byte[name.length + 1]; + child[0] = SECONDARY_IDX_PREFIX; + System.arraycopy(name, 0, child, 1, name.length); + } + + this.name = name; + this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME); + this.copy = isNatural || self.copy(); + this.accessor = accessor; + this.parent = parent; + } + + boolean isCopy() { + return copy; + } + + boolean isChild() { + return parent != null; + } + + Index parent() { + return parent; + } + + /** + * Creates a key prefix for child indices of this index. This allows the prefix to be + * calculated only once, avoiding redundant work when multiple child indices of the + * same parent index exist. + */ + byte[] childPrefix(Object value) throws Exception { + Preconditions.checkState(parent == null, "Not a parent index."); + return buildKey(name, toParentKey(value)); + } + + /** + * Gets the index value for a particular entity (which is the value of the field or method + * tagged with the index annotation). This is used as part of the LevelDB key where the + * entity (or its id) is stored. + */ + Object getValue(Object entity) throws Exception { + return accessor.get(entity); + } + + private void checkParent(byte[] prefix) { + if (prefix != null) { + Preconditions.checkState(parent != null, "Parent prefix provided for parent index."); + } else { + Preconditions.checkState(parent == null, "Parent prefix missing for child index."); + } + } + + /** The prefix for all keys that belong to this index. */ + byte[] keyPrefix(byte[] prefix) { + checkParent(prefix); + return (parent != null) ? buildKey(false, prefix, name) : buildKey(name); + } + + /** + * The key where to start ascending iteration for entities whose value for the indexed field + * match the given value. + */ + byte[] start(byte[] prefix, Object value) { + checkParent(prefix); + return (parent != null) ? buildKey(false, prefix, name, toKey(value)) + : buildKey(name, toKey(value)); + } + + /** The key for the index's end marker. */ + byte[] end(byte[] prefix) { + checkParent(prefix); + return (parent != null) ? buildKey(false, prefix, name, END_MARKER) + : buildKey(name, END_MARKER); + } + + /** The key for the end marker for entries with the given value. */ + byte[] end(byte[] prefix, Object value) throws Exception { + checkParent(prefix); + return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER) + : buildKey(name, toKey(value), END_MARKER); + } + + /** The full key in the index that identifies the given entity. */ + byte[] entityKey(byte[] prefix, Object entity) throws Exception { + Object indexValue = getValue(entity); + Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.", + name, type.getName()); + byte[] entityKey = start(prefix, indexValue); + if (!isNatural) { + entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity))); + } + return entityKey; + } + + private void updateCount(WriteBatch batch, byte[] key, long delta) throws Exception { + long updated = getCount(key) + delta; + if (updated > 0) { + batch.put(key, db.serializer.serialize(updated)); + } else { + batch.delete(key); + } + } + + private void addOrRemove( + WriteBatch batch, + Object entity, + Object existing, + byte[] data, + byte[] naturalKey, + byte[] prefix) throws Exception { + Object indexValue = getValue(entity); + Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.", + name, type.getName()); + + byte[] entityKey = start(prefix, indexValue); + if (!isNatural) { + entityKey = buildKey(false, entityKey, naturalKey); + } + + boolean needCountUpdate = (existing == null); + + // Check whether there's a need to update the index. The index needs to be updated in two + // cases: + // + // - There is no existing value for the entity, so a new index value will be added. + // - If there is a previously stored value for the entity, and the index value for the + // current index does not match the new value, the old entry needs to be deleted and + // the new one added. + // + // Natural indices don't need to be checked, because by definition both old and new entities + // will have the same key. The put() call is all that's needed in that case. + // + // Also check whether we need to update the counts. If the indexed value is changing, we + // need to decrement the count at the old index value, and the new indexed value count needs + // to be incremented. + if (existing != null && !isNatural) { + byte[] oldPrefix = null; + Object oldIndexedValue = getValue(existing); + boolean removeExisting = !indexValue.equals(oldIndexedValue); + if (!removeExisting && isChild()) { + oldPrefix = parent().childPrefix(parent().getValue(existing)); + removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0; + } + + if (removeExisting) { + if (oldPrefix == null && isChild()) { + oldPrefix = parent().childPrefix(parent().getValue(existing)); + } + + byte[] oldKey = entityKey(oldPrefix, existing); + batch.delete(oldKey); + + // If the indexed value has changed, we need to update the counts at the old and new + // end markers for the indexed value. + if (!isChild()) { + byte[] oldCountKey = end(null, oldIndexedValue); + updateCount(batch, oldCountKey, -1L); + needCountUpdate = true; + } + } + } + + if (data != null) { + byte[] stored = copy ? data : naturalKey; + batch.put(entityKey, stored); + } else { + batch.delete(entityKey); + } + + if (needCountUpdate && !isChild()) { + long delta = data != null ? 1L : -1L; + byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue); + updateCount(batch, countKey, delta); + } + } + + /** + * Add an entry to the index. + * + * @param batch Write batch with other related changes. + * @param entity The entity being added to the index. + * @param existing The entity being replaced in the index, or null. + * @param data Serialized entity to store (when storing the entity, not a reference). + * @param naturalKey The value's natural key (to avoid re-computing it for every index). + * @param prefix The parent index prefix, if this is a child index. + */ + void add( + WriteBatch batch, + Object entity, + Object existing, + byte[] data, + byte[] naturalKey, + byte[] prefix) throws Exception { + addOrRemove(batch, entity, existing, data, naturalKey, prefix); + } + + /** + * Remove a value from the index. + * + * @param batch Write batch with other related changes. + * @param entity The entity being removed, to identify the index entry to modify. + * @param naturalKey The value's natural key (to avoid re-computing it for every index). + * @param prefix The parent index prefix, if this is a child index. + */ + void remove( + WriteBatch batch, + Object entity, + byte[] naturalKey, + byte[] prefix) throws Exception { + addOrRemove(batch, entity, null, null, naturalKey, prefix); + } + + long getCount(byte[] key) throws Exception { + byte[] data = db.db().get(key); + return data != null ? db.serializer.deserializeLong(data) : 0; + } + + byte[] toParentKey(Object value) { + return toKey(value, SECONDARY_IDX_PREFIX); + } + + byte[] toKey(Object value) { + return toKey(value, ENTRY_PREFIX); + } + + /** + * Translates a value to be used as part of the store key. + * + * Integral numbers are encoded as a string in a way that preserves lexicographical + * ordering. The string is prepended with a marker telling whether the number is negative + * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the + * opposite of the desired order), and then the number is encoded into a hex string (so + * it occupies twice the number of bytes as the original type). + * + * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR. + */ + byte[] toKey(Object value, byte prefix) { + final byte[] result; + + if (value instanceof String) { + byte[] str = ((String) value).getBytes(UTF_8); + result = new byte[str.length + 1]; + result[0] = prefix; + System.arraycopy(str, 0, result, 1, str.length); + } else if (value instanceof Boolean) { + result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE }; + } else if (value.getClass().isArray()) { + int length = Array.getLength(value); + byte[][] components = new byte[length][]; + for (int i = 0; i < length; i++) { + components[i] = toKey(Array.get(value, i)); + } + result = buildKey(false, components); + } else { + int bytes; + + if (value instanceof Integer) { + bytes = Integer.SIZE; + } else if (value instanceof Long) { + bytes = Long.SIZE; + } else if (value instanceof Short) { + bytes = Short.SIZE; + } else if (value instanceof Byte) { + bytes = Byte.SIZE; + } else { + throw new IllegalArgumentException(String.format("Type %s not allowed as key.", + value.getClass().getName())); + } + + bytes = bytes / Byte.SIZE; + + byte[] key = new byte[bytes * 2 + 2]; + long longValue = ((Number) value).longValue(); + key[0] = prefix; + key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER; + + for (int i = 0; i < key.length - 2; i++) { + int masked = (int) ((longValue >>> (4 * i)) & 0xF); + key[key.length - i - 1] = HEX_BYTES[masked]; + } + + result = key; + } + + return result; + } + + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java new file mode 100644 index 0000000..75a33b7 --- /dev/null +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import java.io.IOException; + +import org.apache.spark.annotation.Private; + +/** + * Exception thrown when the store implementation is not compatible with the underlying data. + */ +@Private +public class UnsupportedStoreVersionException extends IOException { + +} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java deleted file mode 100644 index d5938ac..0000000 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import java.util.Arrays; - -public class ArrayKeyIndexType { - - @KVIndex - public int[] key; - - @KVIndex("id") - public String[] id; - - @Override - public boolean equals(Object o) { - if (o instanceof ArrayKeyIndexType) { - ArrayKeyIndexType other = (ArrayKeyIndexType) o; - return Arrays.equals(key, other.key) && Arrays.equals(id, other.id); - } - return false; - } - - @Override - public int hashCode() { - return key.hashCode(); - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java deleted file mode 100644 index f9b4774..0000000 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import org.junit.Test; -import static org.junit.Assert.*; - -public class ArrayWrappersSuite { - - @Test - public void testGenericArrayKey() { - byte[] b1 = new byte[] { 0x01, 0x02, 0x03 }; - byte[] b2 = new byte[] { 0x01, 0x02 }; - int[] i1 = new int[] { 1, 2, 3 }; - int[] i2 = new int[] { 1, 2 }; - String[] s1 = new String[] { "1", "2", "3" }; - String[] s2 = new String[] { "1", "2" }; - - assertEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b1)); - assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b2)); - assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(i1)); - assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(s1)); - - assertEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i1)); - assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i2)); - assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(b1)); - assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(s1)); - - assertEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s1)); - assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s2)); - assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(b1)); - assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(i1)); - - assertEquals(0, ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b1))); - assertTrue(ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b2)) > 0); - - assertEquals(0, ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i1))); - assertTrue(ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i2)) > 0); - - assertEquals(0, ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s1))); - assertTrue(ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s2)) > 0); - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java deleted file mode 100644 index afb72b8..0000000 --- a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kvstore; - -import com.google.common.base.Objects; - -public class CustomType1 { - - @KVIndex - public String key; - - @KVIndex("id") - public String id; - - @KVIndex(value = "name", copy = true) - public String name; - - @KVIndex("int") - public int num; - - @KVIndex(value = "child", parent = "id") - public String child; - - @Override - public boolean equals(Object o) { - if (o instanceof CustomType1) { - CustomType1 other = (CustomType1) o; - return id.equals(other.id) && name.equals(other.name); - } - return false; - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("key", key) - .add("id", id) - .add("name", name) - .add("num", num) - .toString(); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org