Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17902#discussion_r119651685
  
    --- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.kvstore;
    +
    +import java.lang.reflect.Array;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Method;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import static java.nio.charset.StandardCharsets.UTF_8;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Throwables;
    +import org.iq80.leveldb.WriteBatch;
    +
    +/**
    + * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
    + * via reflection, to make it cheaper to access it multiple times.
    + *
    + * <p>
    + * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
    + * that iteration over indices is easy, and that updating values in the 
store is not overly
    + * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
    + * lists of pointers, which would be more expensive to update at runtime.
    + * </p>
    + *
    + * <p>
    + * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
    + * key would be the concatenation of everything up to that point in the 
hierarchy, with each
    + * component separated by a NULL byte.
    + * </p>
    + *
    + * <pre>
    + * +TYPE_NAME
    + *   NATURAL_INDEX
    + *     +NATURAL_KEY
    + *     -
    + *   -NATURAL_INDEX
    + *   INDEX_NAME
    + *     +INDEX_VALUE
    + *       +NATURAL_KEY
    + *     -INDEX_VALUE
    + *     .INDEX_VALUE
    + *       CHILD_INDEX_NAME
    + *         +CHILD_INDEX_VALUE
    + *           NATURAL_KEY_OR_DATA
    + *         -
    + *   -INDEX_NAME
    + * </pre>
    + *
    + * <p>
    + * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
    + * that end with "+<something>". A count of all objects that match a 
particular top-level index
    + * value is kept at the end marker ("-<something>"). A count is also kept 
at the natural index's end
    + * marker, to make it easy to retrieve the number of all elements of a 
particular type.
    + * </p>
    + *
    + * <p>
    + * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
    + * have these keys and values in the store for two instances, one with 
natural key "key1" and the
    + * other "key2", both with value "yes" for "bar":
    + * </p>
    + *
    + * <pre>
    + * Foo __main__ +key1   [data for instance 1]
    + * Foo __main__ +key2   [data for instance 2]
    + * Foo __main__ -       [count of all Foo]
    + * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
    + * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
    + * Foo bar +yes -       [count of all Foo with "bar=yes" ]
    + * </pre>
    + *
    + * <p>
    + * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
    + * explicit end marker. This allows for easily skipping to the end of an 
index by telling LevelDB
    + * to seek to the "phantom" end marker of the index.
    + * </p>
    + *
    + * <p>
    + * Child indices are stored after their parent index. In the example 
above, let's assume there is
    + * a child index "child", whose parent is "bar". If both instances have 
value "no" for this field,
    + * the data in the store would look something like the following:
    + * </p>
    + *
    + * <pre>
    + * ...
    + * Foo bar +yes -
    + * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on 
index type]
    + * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on 
index type]
    + * ...
    + * </pre>
    + */
    +class LevelDBTypeInfo {
    +
    +  static final byte[] END_MARKER = new byte[] { '-' };
    +  static final byte ENTRY_PREFIX = (byte) '+';
    +  static final byte KEY_SEPARATOR = 0x0;
    +  static byte TRUE = (byte) '1';
    +  static byte FALSE = (byte) '0';
    +
    +  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
    +  private static final byte POSITIVE_MARKER = (byte) '=';
    +  private static final byte NEGATIVE_MARKER = (byte) '*';
    +  private static final byte[] HEX_BYTES = new byte[] {
    +    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 
'e', 'f'
    +  };
    +
    +  private final LevelDB db;
    +  private final Class<?> type;
    +  private final Map<String, Index> indices;
    +  private final byte[] typePrefix;
    +
    +  LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws 
Exception {
    +    this.db = db;
    +    this.type = type;
    +    this.indices = new HashMap<>();
    +
    +    KVTypeInfo ti = new KVTypeInfo(type);
    +
    +    // First create the parent indices, then the child indices.
    +    ti.indices().forEach(idx -> {
    +      if (idx.parent().isEmpty()) {
    +        indices.put(idx.value(), new Index(idx, 
ti.getAccessor(idx.value()), null));
    +      }
    +    });
    +    ti.indices().forEach(idx -> {
    +      if (!idx.parent().isEmpty()) {
    +        indices.put(idx.value(), new Index(idx, 
ti.getAccessor(idx.value()),
    +          indices.get(idx.parent())));
    +      }
    +    });
    +
    +    this.typePrefix = alias;
    +  }
    +
    +  Class<?> type() {
    +    return type;
    +  }
    +
    +  byte[] keyPrefix() {
    +    return typePrefix;
    +  }
    +
    +  Index naturalIndex() {
    +    return index(KVIndex.NATURAL_INDEX_NAME);
    +  }
    +
    +  Index index(String name) {
    +    Index i = indices.get(name);
    +    Preconditions.checkArgument(i != null, "Index %s does not exist for 
type %s.", name,
    +      type.getName());
    +    return i;
    +  }
    +
    +  Collection<Index> indices() {
    +    return indices.values();
    +  }
    +
    +  byte[] buildKey(byte[]... components) {
    +    return buildKey(true, components);
    +  }
    +
    +  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
    +    int len = 0;
    +    if (addTypePrefix) {
    +      len += typePrefix.length + 1;
    +    }
    +    for (byte[] comp : components) {
    +      len += comp.length;
    +    }
    +    len += components.length - 1;
    +
    +    byte[] dest = new byte[len];
    +    int written = 0;
    +
    +    if (addTypePrefix) {
    +      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
    +      dest[typePrefix.length] = KEY_SEPARATOR;
    +      written += typePrefix.length + 1;
    +    }
    +
    +    for (byte[] comp : components) {
    +      System.arraycopy(comp, 0, dest, written, comp.length);
    +      written += comp.length;
    +      if (written < dest.length) {
    +        dest[written] = KEY_SEPARATOR;
    +        written++;
    +      }
    +    }
    +
    +    return dest;
    +  }
    +
    +  /**
    +   * Models a single index in LevelDB. See top-level class's javadoc for a 
description of how the
    +   * keys are generated.
    +   */
    +  class Index {
    +
    +    private final boolean copy;
    +    private final boolean isNatural;
    +    private final byte[] name;
    +    private final KVTypeInfo.Accessor accessor;
    +    private final Index parent;
    +
    +    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index 
parent) {
    +      byte[] name = self.value().getBytes(UTF_8);
    +      if (parent != null) {
    +        byte[] child = new byte[name.length + 1];
    +        child[0] = SECONDARY_IDX_PREFIX;
    +        System.arraycopy(name, 0, child, 1, name.length);
    +      }
    +
    +      this.name = name;
    +      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
    +      this.copy = isNatural || self.copy();
    +      this.accessor = accessor;
    +      this.parent = parent;
    +    }
    +
    +    boolean isCopy() {
    +      return copy;
    +    }
    +
    +    boolean isChild() {
    +      return parent != null;
    +    }
    +
    +    Index parent() {
    +      return parent;
    +    }
    +
    +    /**
    +     * Creates a key prefix for child indices of this index. This allows 
the prefix to be
    +     * calculated only once, avoiding redundant work when multiple child 
indices of the
    +     * same parent index exist.
    +     */
    +    byte[] childPrefix(Object value) throws Exception {
    +      Preconditions.checkState(parent == null, "Not a parent index.");
    +      return buildKey(name, toParentKey(value));
    +    }
    +
    +    Object getValue(Object entity) throws Exception {
    +      return accessor.get(entity);
    +    }
    +
    +    private void checkParent(byte[] prefix) {
    +      if (prefix != null) {
    +        Preconditions.checkState(parent != null, "Parent prefix provided 
for parent index.");
    +      } else {
    +        Preconditions.checkState(parent == null, "Parent prefix missing 
for child index.");
    +      }
    +    }
    +
    +    /** The prefix for all keys that belong to this index. */
    +    byte[] keyPrefix(byte[] prefix) {
    +      checkParent(prefix);
    +      return (parent != null) ? buildKey(false, prefix, name) : 
buildKey(name);
    +    }
    +
    +    /** The key where to start ascending iteration for entries that match 
the given value. */
    +    byte[] start(byte[] prefix, Object value) {
    +      checkParent(prefix);
    +      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
    +        : buildKey(name, toKey(value));
    +    }
    +
    +    /** The key for the index's end marker. */
    +    byte[] end(byte[] prefix) {
    +      checkParent(prefix);
    +      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
    +        : buildKey(name, END_MARKER);
    +    }
    +
    +    /** The key for the end marker for index entries with the given value. 
*/
    +    byte[] end(byte[] prefix, Object value) throws Exception {
    +      checkParent(prefix);
    +      return (parent != null) ? buildKey(false, prefix, name, 
toKey(value), END_MARKER)
    +        : buildKey(name, toKey(value), END_MARKER);
    +    }
    +
    +    /** The key in the index that identifies the given entity. */
    +    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
    +      Object indexValue = getValue(entity);
    +      Preconditions.checkNotNull(indexValue, "Null index value for %s in 
type %s.",
    +        name, type.getName());
    +      byte[] entityKey = start(prefix, indexValue);
    +      if (!isNatural) {
    +        entityKey = buildKey(false, entityKey, 
toKey(naturalIndex().getValue(entity)));
    +      }
    +      return entityKey;
    +    }
    +
    +    private void updateCount(WriteBatch batch, byte[] key, long delta) 
throws Exception {
    +      long updated = getCount(key) + delta;
    +      if (updated > 0) {
    +        batch.put(key, db.serializer.serialize(updated));
    +      } else {
    +        batch.delete(key);
    +      }
    +    }
    +
    +    private void addOrRemove(
    +        WriteBatch batch,
    +        Object entity,
    +        Object existing,
    +        byte[] data,
    +        byte[] naturalKey,
    +        byte[] prefix) throws Exception {
    +      Object indexValue = getValue(entity);
    +      Preconditions.checkNotNull(indexValue, "Null index value for %s in 
type %s.",
    +        name, type.getName());
    +
    +      byte[] entityKey = start(prefix, indexValue);
    +      if (!isNatural) {
    +        entityKey = buildKey(false, entityKey, naturalKey);
    +      }
    +
    +      boolean needCountUpdate = (existing == null);
    +
    +      // Check whether the index key for the existing value matches the 
new value. If it doesn't,
    +      // then explicitly delete the existing key, otherwise just let the 
"put()" call overwrite it.
    +      //
    +      // Also check whether we need to update the counts. If the indexed 
value is changing, we
    +      // need to decrement the count at the old index value, and the new 
indexed value count needs
    +      // to be incremented.
    +      //
    +      // Natural indices don't need to be checked, because by definition 
both old and new elements
    +      // will have the same key.
    --- End diff --
    
    I had some trouble understanding this part at first, I couldn't understand 
why the values would differ -- now that I understand I see you had explained it 
in this last part of the comment.  I think it might have been easier for me to 
follow if you add the `existing` is the element that was previously stored in 
the *natural index* at the same key as the new entity, I had lost track of that 
detail when I was reading this code by itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to