http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java new file mode 100644 index 0000000..5d85d00 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.utils.AbstractIterator; +import org.apache.cassandra.index.sasi.utils.CombinedValue; +import org.apache.cassandra.index.sasi.utils.MappedBuffer; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.utils.MergeIterator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType; + +// Note: all of the seek-able offsets contained in TokenTree should be sizeof(long) +// even if currently only lower int portion of them if used, because that makes +// it possible to switch to mmap implementation which supports long positions +// without any on-disk format changes and/or re-indexing if one day we'll have a need to. +public class TokenTree +{ + private static final int LONG_BYTES = Long.SIZE / 8; + private static final int SHORT_BYTES = Short.SIZE / 8; + + private final Descriptor descriptor; + private final MappedBuffer file; + private final long startPos; + private final long treeMinToken; + private final long treeMaxToken; + private final long tokenCount; + + @VisibleForTesting + protected TokenTree(MappedBuffer tokenTree) + { + this(Descriptor.CURRENT, tokenTree); + } + + public TokenTree(Descriptor d, MappedBuffer tokenTree) + { + descriptor = d; + file = tokenTree; + startPos = file.position(); + + file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES); + + if (!validateMagic()) + throw new IllegalArgumentException("invalid token tree"); + + tokenCount = file.getLong(); + treeMinToken = file.getLong(); + treeMaxToken = file.getLong(); + } + + public long getCount() + { + return tokenCount; + } + + public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher) + { + return new TokenTreeIterator(file.duplicate(), keyFetcher); + } + + public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher) + { + seekToLeaf(searchToken, file); + long leafStart = file.position(); + short leafSize = file.getShort(leafStart + 1); // skip the info byte + + file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); // skip to tokens + short tokenIndex = searchLeaf(searchToken, leafSize); + + file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); + + OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher); + return token.get().equals(searchToken) ? token : null; + } + + private boolean validateMagic() + { + switch (descriptor.version.toString()) + { + case Descriptor.VERSION_AA: + return true; + case Descriptor.VERSION_AB: + return TokenTreeBuilder.AB_MAGIC == file.getShort(); + default: + return false; + } + } + + // finds leaf that *could* contain token + private void seekToLeaf(long token, MappedBuffer file) + { + // this loop always seeks forward except for the first iteration + // where it may seek back to the root + long blockStart = startPos; + while (true) + { + file.position(blockStart); + + byte info = file.get(); + boolean isLeaf = (info & 1) == 1; + + if (isLeaf) + { + file.position(blockStart); + break; + } + + short tokenCount = file.getShort(); + + long minToken = file.getLong(); + long maxToken = file.getLong(); + + long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES; + if (minToken > token) + { + // seek to beginning of child offsets to locate first child + file.position(seekBase + tokenCount * LONG_BYTES); + blockStart = (startPos + (int) file.getLong()); + } + else if (maxToken < token) + { + // seek to end of child offsets to locate last child + file.position(seekBase + (2 * tokenCount) * LONG_BYTES); + blockStart = (startPos + (int) file.getLong()); + } + else + { + // skip to end of block header/start of interior block tokens + file.position(seekBase); + + short offsetIndex = searchBlock(token, tokenCount, file); + + // file pointer is now at beginning of offsets + if (offsetIndex == tokenCount) + file.position(file.position() + (offsetIndex * LONG_BYTES)); + else + file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES); + + blockStart = (startPos + (int) file.getLong()); + } + } + } + + private short searchBlock(long searchToken, short tokenCount, MappedBuffer file) + { + short offsetIndex = 0; + for (int i = 0; i < tokenCount; i++) + { + long readToken = file.getLong(); + if (searchToken < readToken) + break; + + offsetIndex++; + } + + return offsetIndex; + } + + private short searchLeaf(long searchToken, short tokenCount) + { + long base = file.position(); + + int start = 0; + int end = tokenCount; + int middle = 0; + + while (start <= end) + { + middle = start + ((end - start) >> 1); + + // each entry is 16 bytes wide, token is in bytes 4-11 + long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4)); + + if (token == searchToken) + break; + + if (token < searchToken) + start = middle + 1; + else + end = middle - 1; + } + + return (short) middle; + } + + public class TokenTreeIterator extends RangeIterator<Long, Token> + { + private final Function<Long, DecoratedKey> keyFetcher; + private final MappedBuffer file; + + private long currentLeafStart; + private int currentTokenIndex; + + private long leafMinToken; + private long leafMaxToken; + private short leafSize; + + protected boolean firstIteration = true; + private boolean lastLeaf; + + TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher) + { + super(treeMinToken, treeMaxToken, tokenCount); + + this.file = file; + this.keyFetcher = keyFetcher; + } + + protected Token computeNext() + { + maybeFirstIteration(); + + if (currentTokenIndex >= leafSize && lastLeaf) + return endOfData(); + + if (currentTokenIndex < leafSize) // tokens remaining in this leaf + { + return getTokenAt(currentTokenIndex++); + } + else // no more tokens remaining in this leaf + { + assert !lastLeaf; + + seekToNextLeaf(); + setupBlock(); + return computeNext(); + } + } + + protected void performSkipTo(Long nextToken) + { + maybeFirstIteration(); + + if (nextToken <= leafMaxToken) // next is in this leaf block + { + searchLeaf(nextToken); + } + else // next is in a leaf block that needs to be found + { + seekToLeaf(nextToken, file); + setupBlock(); + findNearest(nextToken); + } + } + + private void setupBlock() + { + currentLeafStart = file.position(); + currentTokenIndex = 0; + + lastLeaf = (file.get() & (1 << TokenTreeBuilder.LAST_LEAF_SHIFT)) > 0; + leafSize = file.getShort(); + + leafMinToken = file.getLong(); + leafMaxToken = file.getLong(); + + // seek to end of leaf header/start of data + file.position(currentLeafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); + } + + private void findNearest(Long next) + { + if (next > leafMaxToken && !lastLeaf) + { + seekToNextLeaf(); + setupBlock(); + findNearest(next); + } + else if (next > leafMinToken) + searchLeaf(next); + } + + private void searchLeaf(long next) + { + for (int i = currentTokenIndex; i < leafSize; i++) + { + if (compareTokenAt(currentTokenIndex, next) >= 0) + break; + + currentTokenIndex++; + } + } + + private int compareTokenAt(int idx, long toToken) + { + return Long.compare(file.getLong(getTokenPosition(idx)), toToken); + } + + private Token getTokenAt(int idx) + { + return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher); + } + + private long getTokenPosition(int idx) + { + // skip 4 byte entry header to get position pointing directly at the entry's token + return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES); + } + + private void seekToNextLeaf() + { + file.position(currentLeafStart + TokenTreeBuilder.BLOCK_BYTES); + } + + public void close() throws IOException + { + // nothing to do here + } + + private void maybeFirstIteration() + { + // seek to the first token only when requested for the first time, + // highly predictable branch and saves us a lot by not traversing the tree + // on creation time because it's not at all required. + if (!firstIteration) + return; + + seekToLeaf(treeMinToken, file); + setupBlock(); + firstIteration = false; + } + } + + public static class OnDiskToken extends Token + { + private final Set<TokenInfo> info = new HashSet<>(2); + private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator); + + public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher) + { + super(buffer.getLong(position + (2 * SHORT_BYTES))); + info.add(new TokenInfo(buffer, position, leafSize, keyFetcher)); + } + + public void merge(CombinedValue<Long> other) + { + if (!(other instanceof Token)) + return; + + Token o = (Token) other; + if (token != o.token) + throw new IllegalArgumentException(String.format("%s != %s", token, o.token)); + + if (o instanceof OnDiskToken) + { + info.addAll(((OnDiskToken) other).info); + } + else + { + Iterators.addAll(loadedKeys, o.iterator()); + } + } + + public Iterator<DecoratedKey> iterator() + { + List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size()); + + for (TokenInfo i : info) + keys.add(i.iterator()); + + if (!loadedKeys.isEmpty()) + keys.add(loadedKeys.iterator()); + + return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>() + { + DecoratedKey reduced = null; + + public boolean trivialReduceIsTrivial() + { + return true; + } + + public void reduce(int idx, DecoratedKey current) + { + reduced = current; + } + + protected DecoratedKey getReduced() + { + return reduced; + } + }); + } + + public Set<Long> getOffsets() + { + Set<Long> offsets = new HashSet<>(); + for (TokenInfo i : info) + { + for (long offset : i.fetchOffsets()) + offsets.add(offset); + } + + return offsets; + } + + public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher) + { + return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher); + } + + private static long getEntryPosition(int idx, MappedBuffer file) + { + // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes + return file.position() + (idx * (2 * LONG_BYTES)); + } + } + + private static class TokenInfo + { + private final MappedBuffer buffer; + private final Function<Long, DecoratedKey> keyFetcher; + + private final long position; + private final short leafSize; + + public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher) + { + this.keyFetcher = keyFetcher; + this.buffer = buffer; + this.position = position; + this.leafSize = leafSize; + } + + public Iterator<DecoratedKey> iterator() + { + return new KeyIterator(keyFetcher, fetchOffsets()); + } + + public int hashCode() + { + return new HashCodeBuilder().append(keyFetcher).append(position).append(leafSize).build(); + } + + public boolean equals(Object other) + { + if (!(other instanceof TokenInfo)) + return false; + + TokenInfo o = (TokenInfo) other; + return keyFetcher == o.keyFetcher && position == o.position; + } + + private long[] fetchOffsets() + { + short info = buffer.getShort(position); + short offsetShort = buffer.getShort(position + SHORT_BYTES); + int offsetInt = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES); + + EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK); + + switch (type) + { + case SIMPLE: + return new long[] { offsetInt }; + + case OVERFLOW: + long[] offsets = new long[offsetShort]; // offsetShort contains count of tokens + long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetInt * LONG_BYTES)); + + for (int i = 0; i < offsetShort; i++) + offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES)); + + return offsets; + + case FACTORED: + return new long[] { (((long) offsetInt) << Short.SIZE) + offsetShort }; + + case PACKED: + return new long[] { offsetShort, offsetInt }; + + default: + throw new IllegalStateException("Unknown entry type: " + type); + } + } + } + + private static class KeyIterator extends AbstractIterator<DecoratedKey> + { + private final Function<Long, DecoratedKey> keyFetcher; + private final long[] offsets; + private int index = 0; + + public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets) + { + this.keyFetcher = keyFetcher; + this.offsets = offsets; + } + + public DecoratedKey computeNext() + { + return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java new file mode 100644 index 0000000..e10b057 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java @@ -0,0 +1,839 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import com.carrotsearch.hppc.LongArrayList; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; +import com.carrotsearch.hppc.LongOpenHashSet; +import com.google.common.collect.AbstractIterator; + +public class TokenTreeBuilder +{ + // note: ordinal positions are used here, do not change order + enum EntryType + { + SIMPLE, FACTORED, PACKED, OVERFLOW; + + public static EntryType of(int ordinal) + { + if (ordinal == SIMPLE.ordinal()) + return SIMPLE; + + if (ordinal == FACTORED.ordinal()) + return FACTORED; + + if (ordinal == PACKED.ordinal()) + return PACKED; + + if (ordinal == OVERFLOW.ordinal()) + return OVERFLOW; + + throw new IllegalArgumentException("Unknown ordinal: " + ordinal); + } + } + + public static final int BLOCK_BYTES = 4096; + public static final int BLOCK_HEADER_BYTES = 64; + public static final int OVERFLOW_TRAILER_BYTES = 64; + public static final int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8; + public static final int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16; + public static final long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset + public static final byte LAST_LEAF_SHIFT = 1; + public static final byte SHARED_HEADER_BYTES = 19; + public static final byte ENTRY_TYPE_MASK = 0x03; + public static final short AB_MAGIC = 0x5A51; + + private final SortedMap<Long, LongSet> tokens = new TreeMap<>(); + private int numBlocks; + + private Node root; + private InteriorNode rightmostParent; + private Leaf leftmostLeaf; + private Leaf rightmostLeaf; + private long tokenCount = 0; + private long treeMinToken; + private long treeMaxToken; + + public TokenTreeBuilder() + {} + + public TokenTreeBuilder(SortedMap<Long, LongSet> data) + { + add(data); + } + + public void add(Long token, long keyPosition) + { + LongSet found = tokens.get(token); + if (found == null) + tokens.put(token, (found = new LongOpenHashSet(2))); + + found.add(keyPosition); + } + + public void add(SortedMap<Long, LongSet> data) + { + for (Map.Entry<Long, LongSet> newEntry : data.entrySet()) + { + LongSet found = tokens.get(newEntry.getKey()); + if (found == null) + tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4))); + + for (LongCursor offset : newEntry.getValue()) + found.add(offset.value); + } + } + + public TokenTreeBuilder finish() + { + maybeBulkLoad(); + return this; + } + + public SortedMap<Long, LongSet> getTokens() + { + return tokens; + } + + public long getTokenCount() + { + return tokenCount; + } + + public int serializedSize() + { + if (numBlocks == 1) + return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16)); + else + return numBlocks * BLOCK_BYTES; + } + + public void write(DataOutputPlus out) throws IOException + { + ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES); + Iterator<Node> levelIterator = root.levelIterator(); + long childBlockIndex = 1; + + while (levelIterator != null) + { + + Node firstChild = null; + while (levelIterator.hasNext()) + { + Node block = levelIterator.next(); + + if (firstChild == null && !block.isLeaf()) + firstChild = ((InteriorNode) block).children.get(0); + + block.serialize(childBlockIndex, blockBuffer); + flushBuffer(blockBuffer, out, numBlocks != 1); + + childBlockIndex += block.childCount(); + } + + levelIterator = (firstChild == null) ? null : firstChild.levelIterator(); + } + } + + public Iterator<Pair<Long, LongSet>> iterator() + { + return new TokenIterator(leftmostLeaf.levelIterator()); + } + + private void maybeBulkLoad() + { + if (root == null) + bulkLoad(); + } + + private void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException + { + // seek to end of last block before flushing + if (align) + alignBuffer(buffer, BLOCK_BYTES); + + buffer.flip(); + o.write(buffer); + buffer.clear(); + } + + private static void alignBuffer(ByteBuffer buffer, int blockSize) + { + long curPos = buffer.position(); + if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed + buffer.position((int) FBUtilities.align(curPos, blockSize)); + } + + private void bulkLoad() + { + tokenCount = tokens.size(); + treeMinToken = tokens.firstKey(); + treeMaxToken = tokens.lastKey(); + numBlocks = 1; + + // special case the tree that only has a single block in it (so we don't create a useless root) + if (tokenCount <= TOKENS_PER_BLOCK) + { + leftmostLeaf = new Leaf(tokens); + rightmostLeaf = leftmostLeaf; + root = leftmostLeaf; + } + else + { + root = new InteriorNode(); + rightmostParent = (InteriorNode) root; + + int i = 0; + Leaf lastLeaf = null; + Long firstToken = tokens.firstKey(); + Long finalToken = tokens.lastKey(); + Long lastToken; + for (Long token : tokens.keySet()) + { + if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1))) + { + i++; + continue; + } + + lastToken = token; + Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ? + new Leaf(tokens.subMap(firstToken, lastToken)) : new Leaf(tokens.tailMap(firstToken)); + + if (i == TOKENS_PER_BLOCK) + leftmostLeaf = leaf; + else + lastLeaf.next = leaf; + + rightmostParent.add(leaf); + lastLeaf = leaf; + rightmostLeaf = leaf; + firstToken = lastToken; + i++; + numBlocks++; + + if (token.equals(finalToken)) + { + Leaf finalLeaf = new Leaf(tokens.tailMap(token)); + lastLeaf.next = finalLeaf; + rightmostParent.add(finalLeaf); + rightmostLeaf = finalLeaf; + numBlocks++; + } + } + + } + } + + private abstract class Node + { + protected InteriorNode parent; + protected Node next; + protected Long nodeMinToken, nodeMaxToken; + + public abstract void serialize(long childBlockIndex, ByteBuffer buf); + public abstract int childCount(); + public abstract int tokenCount(); + public abstract Long smallestToken(); + + public Iterator<Node> levelIterator() + { + return new LevelIterator(this); + } + + public boolean isLeaf() + { + return (this instanceof Leaf); + } + + protected boolean isLastLeaf() + { + return this == rightmostLeaf; + } + + protected boolean isRoot() + { + return this == root; + } + + protected void updateTokenRange(long token) + { + nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token); + nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token); + } + + protected void serializeHeader(ByteBuffer buf) + { + Header header; + if (isRoot()) + header = new RootHeader(); + else if (!isLeaf()) + header = new InteriorNodeHeader(); + else + header = new LeafHeader(); + + header.serialize(buf); + alignBuffer(buf, BLOCK_HEADER_BYTES); + } + + private abstract class Header + { + public void serialize(ByteBuffer buf) + { + buf.put(infoByte()) + .putShort((short) (tokenCount())) + .putLong(nodeMinToken) + .putLong(nodeMaxToken); + } + + protected abstract byte infoByte(); + } + + private class RootHeader extends Header + { + public void serialize(ByteBuffer buf) + { + super.serialize(buf); + writeMagic(buf); + buf.putLong(tokenCount) + .putLong(treeMinToken) + .putLong(treeMaxToken); + } + + protected byte infoByte() + { + // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1) + // if not leaf, clear both bits + return (byte) ((isLeaf()) ? 3 : 0); + } + + protected void writeMagic(ByteBuffer buf) + { + switch (Descriptor.CURRENT_VERSION) + { + case Descriptor.VERSION_AB: + buf.putShort(AB_MAGIC); + break; + default: + break; + } + + } + } + + private class InteriorNodeHeader extends Header + { + // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared + protected byte infoByte() + { + return 0; + } + } + + private class LeafHeader extends Header + { + // bit 0 set as leaf indicator + // bit 1 set if this is last leaf of data + protected byte infoByte() + { + byte infoByte = 1; + infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0; + + return infoByte; + } + } + + } + + private class Leaf extends Node + { + private final SortedMap<Long, LongSet> tokens; + private LongArrayList overflowCollisions; + + Leaf(SortedMap<Long, LongSet> data) + { + nodeMinToken = data.firstKey(); + nodeMaxToken = data.lastKey(); + tokens = data; + } + + public Long largestToken() + { + return nodeMaxToken; + } + + public void serialize(long childBlockIndex, ByteBuffer buf) + { + serializeHeader(buf); + serializeData(buf); + serializeOverflowCollisions(buf); + } + + public int childCount() + { + return 0; + } + + public int tokenCount() + { + return tokens.size(); + } + + public Long smallestToken() + { + return nodeMinToken; + } + + public Iterator<Map.Entry<Long, LongSet>> tokenIterator() + { + return tokens.entrySet().iterator(); + } + + private void serializeData(ByteBuffer buf) + { + for (Map.Entry<Long, LongSet> entry : tokens.entrySet()) + createEntry(entry.getKey(), entry.getValue()).serialize(buf); + } + + private void serializeOverflowCollisions(ByteBuffer buf) + { + if (overflowCollisions != null) + for (LongCursor offset : overflowCollisions) + buf.putLong(offset.value); + } + + + private LeafEntry createEntry(final long tok, final LongSet offsets) + { + int offsetCount = offsets.size(); + switch (offsetCount) + { + case 0: + throw new AssertionError("no offsets for token " + tok); + case 1: + long offset = offsets.toArray()[0]; + if (offset > MAX_OFFSET) + throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET); + else if (offset <= Integer.MAX_VALUE) + return new SimpleLeafEntry(tok, offset); + else + return new FactoredOffsetLeafEntry(tok, offset); + case 2: + long[] rawOffsets = offsets.toArray(); + if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE && + (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE)) + return new PackedCollisionLeafEntry(tok, rawOffsets); + else + return createOverflowEntry(tok, offsetCount, offsets); + default: + return createOverflowEntry(tok, offsetCount, offsets); + } + } + + private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets) + { + if (overflowCollisions == null) + overflowCollisions = new LongArrayList(); + + LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount); + for (LongCursor o : offsets) { + if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY) + throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf"); + else + overflowCollisions.add(o.value); + } + return entry; + } + + private abstract class LeafEntry + { + protected final long token; + + abstract public EntryType type(); + abstract public int offsetData(); + abstract public short offsetExtra(); + + public LeafEntry(final long tok) + { + token = tok; + } + + public void serialize(ByteBuffer buf) + { + buf.putShort((short) type().ordinal()) + .putShort(offsetExtra()) + .putLong(token) + .putInt(offsetData()); + } + + } + + + // assumes there is a single offset and the offset is <= Integer.MAX_VALUE + private class SimpleLeafEntry extends LeafEntry + { + private final long offset; + + public SimpleLeafEntry(final long tok, final long off) + { + super(tok); + offset = off; + } + + public EntryType type() + { + return EntryType.SIMPLE; + } + + public int offsetData() + { + return (int) offset; + } + + public short offsetExtra() + { + return 0; + } + } + + // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET + // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits) + // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header + private class FactoredOffsetLeafEntry extends LeafEntry + { + private final long offset; + + public FactoredOffsetLeafEntry(final long tok, final long off) + { + super(tok); + offset = off; + } + + public EntryType type() + { + return EntryType.FACTORED; + } + + public int offsetData() + { + return (int) (offset >>> Short.SIZE); + } + + public short offsetExtra() + { + return (short) offset; + } + } + + // holds an entry with two offsets that can be packed in an int & a short + // the int offset is stored where offset is normally stored. short offset is + // stored in entry header + private class PackedCollisionLeafEntry extends LeafEntry + { + private short smallerOffset; + private int largerOffset; + + public PackedCollisionLeafEntry(final long tok, final long[] offs) + { + super(tok); + + smallerOffset = (short) Math.min(offs[0], offs[1]); + largerOffset = (int) Math.max(offs[0], offs[1]); + } + + public EntryType type() + { + return EntryType.PACKED; + } + + public int offsetData() + { + return largerOffset; + } + + public short offsetExtra() + { + return smallerOffset; + } + } + + // holds an entry with three or more offsets, or two offsets that cannot + // be packed into an int & a short. the index into the overflow list + // is stored where the offset is normally stored. the number of overflowed offsets + // for the entry is stored in the entry header + private class OverflowCollisionLeafEntry extends LeafEntry + { + private final short startIndex; + private final short count; + + public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount) + { + super(tok); + startIndex = collisionStartIndex; + count = collisionCount; + } + + public EntryType type() + { + return EntryType.OVERFLOW; + } + + public int offsetData() + { + return startIndex; + } + + public short offsetExtra() + { + return count; + } + + } + + } + + private class InteriorNode extends Node + { + private List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK); + private List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1); + private int position = 0; // TODO (jwest): can get rid of this and use array size + + + public void serialize(long childBlockIndex, ByteBuffer buf) + { + serializeHeader(buf); + serializeTokens(buf); + serializeChildOffsets(childBlockIndex, buf); + } + + public int childCount() + { + return children.size(); + } + + public int tokenCount() + { + return tokens.size(); + } + + public Long smallestToken() + { + return tokens.get(0); + } + + protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild) + { + int pos = tokens.size(); + if (pos == TOKENS_PER_BLOCK) + { + InteriorNode sibling = split(); + sibling.add(token, leftChild, rightChild); + + } + else { + if (leftChild != null) + children.add(pos, leftChild); + + if (rightChild != null) + { + children.add(pos + 1, rightChild); + rightChild.parent = this; + } + + updateTokenRange(token); + tokens.add(pos, token); + } + } + + protected void add(Leaf node) + { + + if (position == (TOKENS_PER_BLOCK + 1)) + { + rightmostParent = split(); + rightmostParent.add(node); + } + else + { + + node.parent = this; + children.add(position, node); + position++; + + // the first child is referenced only during bulk load. we don't take a value + // to store into the tree, one is subtracted since position has already been incremented + // for the next node to be added + if (position - 1 == 0) + return; + + + // tokens are inserted one behind the current position, but 2 is subtracted because + // position has already been incremented for the next add + Long smallestToken = node.smallestToken(); + updateTokenRange(smallestToken); + tokens.add(position - 2, smallestToken); + } + + } + + protected InteriorNode split() + { + Pair<Long, InteriorNode> splitResult = splitBlock(); + Long middleValue = splitResult.left; + InteriorNode sibling = splitResult.right; + InteriorNode leftChild = null; + + // create a new root if necessary + if (parent == null) + { + parent = new InteriorNode(); + root = parent; + sibling.parent = parent; + leftChild = this; + numBlocks++; + } + + parent.add(middleValue, leftChild, sibling); + + return sibling; + } + + protected Pair<Long, InteriorNode> splitBlock() + { + final int splitPosition = TOKENS_PER_BLOCK - 2; + InteriorNode sibling = new InteriorNode(); + sibling.parent = parent; + next = sibling; + + Long middleValue = tokens.get(splitPosition); + + for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++) + { + if (i != TOKENS_PER_BLOCK && i != splitPosition) + { + long token = tokens.get(i); + sibling.updateTokenRange(token); + sibling.tokens.add(token); + } + + Node child = children.get(i + 1); + child.parent = sibling; + sibling.children.add(child); + sibling.position++; + } + + for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--) + { + if (i != TOKENS_PER_BLOCK) + tokens.remove(i); + + if (i != splitPosition) + children.remove(i); + } + + nodeMinToken = smallestToken(); + nodeMaxToken = tokens.get(tokens.size() - 1); + numBlocks++; + + return Pair.create(middleValue, sibling); + } + + protected boolean isFull() + { + return (position >= TOKENS_PER_BLOCK + 1); + } + + private void serializeTokens(ByteBuffer buf) + { + for (Long token : tokens) + buf.putLong(token); + } + + + private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf) + { + for (int i = 0; i < children.size(); i++) + buf.putLong((childBlockIndex + i) * BLOCK_BYTES); + } + } + + public static class LevelIterator extends AbstractIterator<Node> + { + private Node currentNode; + + LevelIterator(Node first) + { + currentNode = first; + } + + public Node computeNext() + { + if (currentNode == null) + return endOfData(); + + Node returnNode = currentNode; + currentNode = returnNode.next; + + return returnNode; + } + + + } + + public static class TokenIterator extends AbstractIterator<Pair<Long, LongSet>> + { + private Iterator<Node> levelIterator; + private Iterator<Map.Entry<Long, LongSet>> currentIterator; + + TokenIterator(Iterator<Node> level) + { + levelIterator = level; + if (levelIterator.hasNext()) + currentIterator = ((Leaf) levelIterator.next()).tokenIterator(); + } + + public Pair<Long, LongSet> computeNext() + { + if (currentIterator != null && currentIterator.hasNext()) + { + Map.Entry<Long, LongSet> next = currentIterator.next(); + return Pair.create(next.getKey(), next.getValue()); + } + else + { + if (!levelIterator.hasNext()) + return endOfData(); + else + { + currentIterator = ((Leaf) levelIterator.next()).tokenIterator(); + return computeNext(); + } + } + + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java b/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java new file mode 100644 index 0000000..af577dc --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.exceptions; + +public class TimeQuotaExceededException extends RuntimeException +{} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java new file mode 100644 index 0000000..cf7f3a5 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.memory; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.index.sasi.utils.TypeUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(IndexMemtable.class); + + private final MemIndex index; + + public IndexMemtable(ColumnIndex columnIndex) + { + this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex); + } + + public long index(DecoratedKey key, ByteBuffer value) + { + if (value == null || value.remaining() == 0) + return 0; + + AbstractType<?> validator = index.columnIndex.getValidator(); + if (!TypeUtil.isValid(value, validator)) + { + int size = value.remaining(); + if ((value = TypeUtil.tryUpcast(value, validator)) == null) + { + logger.error("Can't add column {} to index for key: {}, value size {} bytes, validator: {}.", + index.columnIndex.getColumnName(), + index.columnIndex.keyValidator().getString(key.getKey()), + size, + validator); + return 0; + } + } + + return index.add(key, value); + } + + public RangeIterator<Long, Token> search(Expression expression) + { + return index == null ? null : index.search(expression); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java new file mode 100644 index 0000000..293e2ee --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.memory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.utils.AbstractIterator; +import org.apache.cassandra.index.sasi.utils.CombinedValue; +import org.apache.cassandra.index.sasi.utils.RangeIterator; + +import com.google.common.collect.PeekingIterator; + +public class KeyRangeIterator extends RangeIterator<Long, Token> +{ + private final DKIterator iterator; + + public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys) + { + super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size()); + this.iterator = new DKIterator(keys.iterator()); + } + + protected Token computeNext() + { + return iterator.hasNext() ? new DKToken(iterator.next()) : endOfData(); + } + + protected void performSkipTo(Long nextToken) + { + while (iterator.hasNext()) + { + DecoratedKey key = iterator.peek(); + if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0) + break; + + // consume smaller key + iterator.next(); + } + } + + public void close() throws IOException + {} + + private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey> + { + private final Iterator<DecoratedKey> keys; + + public DKIterator(Iterator<DecoratedKey> keys) + { + this.keys = keys; + } + + protected DecoratedKey computeNext() + { + return keys.hasNext() ? keys.next() : endOfData(); + } + } + + private static class DKToken extends Token + { + private final SortedSet<DecoratedKey> keys; + + public DKToken(final DecoratedKey key) + { + super((long) key.getToken().getTokenValue()); + + keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator) + {{ + add(key); + }}; + } + + public void merge(CombinedValue<Long> other) + { + if (!(other instanceof Token)) + return; + + Token o = (Token) other; + assert o.get().equals(token); + + if (o instanceof DKToken) + { + keys.addAll(((DKToken) o).keys); + } + else + { + for (DecoratedKey key : o) + keys.add(key); + } + } + + public Iterator<DecoratedKey> iterator() + { + return keys.iterator(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java new file mode 100644 index 0000000..22d6c9e --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.memory; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.db.marshal.AbstractType; + +import org.github.jamm.MemoryMeter; + +public abstract class MemIndex +{ + protected final AbstractType<?> keyValidator; + protected final ColumnIndex columnIndex; + + protected MemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex) + { + this.keyValidator = keyValidator; + this.columnIndex = columnIndex; + } + + public abstract long add(DecoratedKey key, ByteBuffer value); + public abstract RangeIterator<Long, Token> search(Expression expression); + + public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex) + { + return columnIndex.isLiteral() + ? new TrieMemIndex(keyValidator, columnIndex) + : new SkipListMemIndex(keyValidator, columnIndex); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java new file mode 100644 index 0000000..69b57d0 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.memory; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.db.marshal.AbstractType; + +public class SkipListMemIndex extends MemIndex +{ + public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM + + private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index; + + public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex) + { + super(keyValidator, columnIndex); + index = new ConcurrentSkipListMap<>(columnIndex.getValidator()); + } + + public long add(DecoratedKey key, ByteBuffer value) + { + long overhead = CSLM_OVERHEAD; // DKs are shared + ConcurrentSkipListSet<DecoratedKey> keys = index.get(value); + + if (keys == null) + { + ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator); + keys = index.putIfAbsent(value, newKeys); + if (keys == null) + { + overhead += CSLM_OVERHEAD + value.remaining(); + keys = newKeys; + } + } + + keys.add(key); + + return overhead; + } + + public RangeIterator<Long, Token> search(Expression expression) + { + ByteBuffer min = expression.lower == null ? null : expression.lower.value; + ByteBuffer max = expression.upper == null ? null : expression.upper.value; + + SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search; + + if (min == null && max == null) + { + throw new IllegalArgumentException(); + } + if (min != null && max != null) + { + search = index.subMap(min, expression.lower.inclusive, max, expression.upper.inclusive); + } + else if (min == null) + { + search = index.headMap(max, expression.upper.inclusive); + } + else + { + search = index.tailMap(min, expression.lower.inclusive); + } + + RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); + search.values().stream() + .filter(keys -> !keys.isEmpty()) + .forEach(keys -> builder.add(new KeyRangeIterator(keys))); + + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java new file mode 100644 index 0000000..e4ee6eb --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.memory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.db.marshal.AbstractType; + +import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; +import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree; +import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory; +import com.googlecode.concurrenttrees.radix.node.Node; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.index.sasi.memory.SkipListMemIndex.CSLM_OVERHEAD; + +public class TrieMemIndex extends MemIndex +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemIndex.class); + + private final ConcurrentTrie index; + + public TrieMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex) + { + super(keyValidator, columnIndex); + + switch (columnIndex.getMode().mode) + { + case CONTAINS: + index = new ConcurrentSuffixTrie(columnIndex.getDefinition()); + break; + + case PREFIX: + index = new ConcurrentPrefixTrie(columnIndex.getDefinition()); + break; + + default: + throw new IllegalStateException("Unsupported mode: " + columnIndex.getMode().mode); + } + } + + public long add(DecoratedKey key, ByteBuffer value) + { + AbstractAnalyzer analyzer = columnIndex.getAnalyzer(); + analyzer.reset(value.duplicate()); + + long size = 0; + while (analyzer.hasNext()) + { + ByteBuffer term = analyzer.next(); + + if (term.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE) + { + logger.info("Can't add term of column {} to index for key: {}, term size {} bytes, max allowed size {} bytes, use analyzed = true (if not yet set) for that column.", + columnIndex.getColumnName(), + keyValidator.getString(key.getKey()), + term.remaining(), + OnDiskIndexBuilder.MAX_TERM_SIZE); + continue; + } + + size += index.add(columnIndex.getValidator().getString(term), key); + } + + return size; + } + + public RangeIterator<Long, Token> search(Expression expression) + { + return index.search(expression); + } + + private static abstract class ConcurrentTrie + { + public static final SizeEstimatingNodeFactory NODE_FACTORY = new SizeEstimatingNodeFactory(); + + protected final ColumnDefinition definition; + + public ConcurrentTrie(ColumnDefinition column) + { + definition = column; + } + + public long add(String value, DecoratedKey key) + { + long overhead = CSLM_OVERHEAD; + ConcurrentSkipListSet<DecoratedKey> keys = get(value); + if (keys == null) + { + ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator); + keys = putIfAbsent(value, newKeys); + if (keys == null) + { + overhead += CSLM_OVERHEAD + value.length(); + keys = newKeys; + } + } + + keys.add(key); + + // get and reset new memory size allocated by current thread + overhead += NODE_FACTORY.currentUpdateSize(); + NODE_FACTORY.reset(); + + return overhead; + } + + public RangeIterator<Long, Token> search(Expression expression) + { + assert expression.getOp() == Expression.Op.EQ; // means that min == max + + ByteBuffer prefix = expression.lower == null ? null : expression.lower.value; + + Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(definition.cellValueType().getString(prefix)); + + RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); + for (ConcurrentSkipListSet<DecoratedKey> keys : search) + { + if (!keys.isEmpty()) + builder.add(new KeyRangeIterator(keys)); + } + + return builder.build(); + } + + protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value); + protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value); + protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key); + } + + protected static class ConcurrentPrefixTrie extends ConcurrentTrie + { + private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie; + + private ConcurrentPrefixTrie(ColumnDefinition column) + { + super(column); + trie = new ConcurrentRadixTree<>(NODE_FACTORY); + } + + public ConcurrentSkipListSet<DecoratedKey> get(String value) + { + return trie.getValueForExactKey(value); + } + + public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys) + { + return trie.putIfAbsent(value, newKeys); + } + + public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value) + { + return trie.getValuesForKeysStartingWith(value); + } + } + + protected static class ConcurrentSuffixTrie extends ConcurrentTrie + { + private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie; + + private ConcurrentSuffixTrie(ColumnDefinition column) + { + super(column); + trie = new ConcurrentSuffixTree<>(NODE_FACTORY); + } + + public ConcurrentSkipListSet<DecoratedKey> get(String value) + { + return trie.getValueForExactKey(value); + } + + public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys) + { + return trie.putIfAbsent(value, newKeys); + } + + public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value) + { + return trie.getValuesForKeysContaining(value); + } + } + + // This relies on the fact that all of the tree updates are done under exclusive write lock, + // method would overestimate in certain circumstances e.g. when nodes are replaced in place, + // but it's still better comparing to underestimate since it gives more breathing room for other memory users. + private static class SizeEstimatingNodeFactory extends SmartArrayBasedNodeFactory + { + private final ThreadLocal<Long> updateSize = ThreadLocal.withInitial(() -> 0L); + + public Node createNode(CharSequence edgeCharacters, Object value, List<Node> childNodes, boolean isRoot) + { + Node node = super.createNode(edgeCharacters, value, childNodes, isRoot); + updateSize.set(updateSize.get() + measure(node)); + return node; + } + + public long currentUpdateSize() + { + return updateSize.get(); + } + + public void reset() + { + updateSize.set(0L); + } + + private long measure(Node node) + { + // node with max overhead is CharArrayNodeLeafWithValue = 24B + long overhead = 24; + + // array of chars (2 bytes) + CharSequence overhead + overhead += 24 + node.getIncomingEdge().length() * 2; + + if (node.getOutgoingEdges() != null) + { + // 16 bytes for AtomicReferenceArray + overhead += 16; + overhead += 24 * node.getOutgoingEdges().size(); + } + + return overhead; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/Expression.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Expression.java b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java new file mode 100644 index 0000000..e215ec7 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.plan; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndex; +import org.apache.cassandra.index.sasi.utils.TypeUtil; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.utils.ByteBufferUtil; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Expression +{ + private static final Logger logger = LoggerFactory.getLogger(Expression.class); + + public enum Op + { + EQ, NOT_EQ, RANGE + } + + private final QueryController controller; + + public final AbstractAnalyzer analyzer; + + public final ColumnIndex index; + public final AbstractType<?> validator; + public final boolean isLiteral; + + @VisibleForTesting + protected Op operation; + + public Bound lower, upper; + public List<ByteBuffer> exclusions = new ArrayList<>(); + + public Expression(Expression other) + { + this(other.controller, other.index); + operation = other.operation; + } + + public Expression(QueryController controller, ColumnIndex columnIndex) + { + this.controller = controller; + this.index = columnIndex; + this.analyzer = columnIndex.getAnalyzer(); + this.validator = columnIndex.getValidator(); + this.isLiteral = columnIndex.isLiteral(); + } + + @VisibleForTesting + public Expression(String name, AbstractType<?> validator) + { + this(null, new ColumnIndex(UTF8Type.instance, ColumnDefinition.regularDef("sasi", "internal", name, validator), null)); + } + + public Expression setLower(Bound newLower) + { + lower = newLower == null ? null : new Bound(newLower.value, newLower.inclusive); + return this; + } + + public Expression setUpper(Bound newUpper) + { + upper = newUpper == null ? null : new Bound(newUpper.value, newUpper.inclusive); + return this; + } + + public Expression setOp(Op op) + { + this.operation = op; + return this; + } + + public Expression add(Operator op, ByteBuffer value) + { + boolean lowerInclusive = false, upperInclusive = false; + switch (op) + { + case EQ: + lower = new Bound(value, true); + upper = lower; + operation = Op.EQ; + break; + + case NEQ: + // index expressions are priority sorted + // and NOT_EQ is the lowest priority, which means that operation type + // is always going to be set before reaching it in case of RANGE or EQ. + if (operation == null) + { + operation = Op.NOT_EQ; + lower = new Bound(value, true); + upper = lower; + } + else + exclusions.add(value); + break; + + case LTE: + upperInclusive = true; + case LT: + operation = Op.RANGE; + upper = new Bound(value, upperInclusive); + break; + + case GTE: + lowerInclusive = true; + case GT: + operation = Op.RANGE; + lower = new Bound(value, lowerInclusive); + break; + } + + return this; + } + + public Expression addExclusion(ByteBuffer value) + { + exclusions.add(value); + return this; + } + + public boolean contains(ByteBuffer value) + { + if (!TypeUtil.isValid(value, validator)) + { + int size = value.remaining(); + if ((value = TypeUtil.tryUpcast(value, validator)) == null) + { + logger.error("Can't cast value for {} to size accepted by {}, value size is {} bytes.", + index.getColumnName(), + validator, + size); + return false; + } + } + + if (lower != null) + { + // suffix check + if (isLiteral) + { + if (!validateStringValue(value, lower.value)) + return false; + } + else + { + // range or (not-)equals - (mainly) for numeric values + int cmp = validator.compare(lower.value, value); + + // in case of (NOT_)EQ lower == upper + if (operation == Op.EQ || operation == Op.NOT_EQ) + return cmp == 0; + + if (cmp > 0 || (cmp == 0 && !lower.inclusive)) + return false; + } + } + + if (upper != null && lower != upper) + { + // string (prefix or suffix) check + if (isLiteral) + { + if (!validateStringValue(value, upper.value)) + return false; + } + else + { + // range - mainly for numeric values + int cmp = validator.compare(upper.value, value); + if (cmp < 0 || (cmp == 0 && !upper.inclusive)) + return false; + } + } + + // as a last step let's check exclusions for the given field, + // this covers EQ/RANGE with exclusions. + for (ByteBuffer term : exclusions) + { + if (isLiteral && validateStringValue(value, term)) + return false; + else if (validator.compare(term, value) == 0) + return false; + } + + return true; + } + + private boolean validateStringValue(ByteBuffer columnValue, ByteBuffer requestedValue) + { + analyzer.reset(columnValue.duplicate()); + while (analyzer.hasNext()) + { + ByteBuffer term = analyzer.next(); + if (ByteBufferUtil.contains(term, requestedValue)) + return true; + } + + return false; + } + + public Op getOp() + { + return operation; + } + + public void checkpoint() + { + if (controller == null) + return; + + controller.checkpoint(); + } + + public boolean hasLower() + { + return lower != null; + } + + public boolean hasUpper() + { + return upper != null; + } + + public boolean isLowerSatisfiedBy(OnDiskIndex.DataTerm term) + { + if (!hasLower()) + return true; + + int cmp = term.compareTo(validator, lower.value, false); + return cmp > 0 || cmp == 0 && lower.inclusive; + } + + public boolean isUpperSatisfiedBy(OnDiskIndex.DataTerm term) + { + if (!hasUpper()) + return true; + + int cmp = term.compareTo(validator, upper.value, false); + return cmp < 0 || cmp == 0 && upper.inclusive; + } + + public boolean isIndexed() + { + return index.isIndexed(); + } + + public String toString() + { + return String.format("Expression{name: %s, op: %s, lower: (%s, %s), upper: (%s, %s), exclusions: %s}", + index.getColumnName(), + operation, + lower == null ? "null" : validator.getString(lower.value), + lower != null && lower.inclusive, + upper == null ? "null" : validator.getString(upper.value), + upper != null && upper.inclusive, + Iterators.toString(Iterators.transform(exclusions.iterator(), validator::getString))); + } + + public int hashCode() + { + return new HashCodeBuilder().append(index.getColumnName()) + .append(operation) + .append(validator) + .append(lower).append(upper) + .append(exclusions).build(); + } + + public boolean equals(Object other) + { + if (!(other instanceof Expression)) + return false; + + if (this == other) + return true; + + Expression o = (Expression) other; + + return Objects.equals(index.getColumnName(), o.index.getColumnName()) + && validator.equals(o.validator) + && operation == o.operation + && Objects.equals(lower, o.lower) + && Objects.equals(upper, o.upper) + && exclusions.equals(o.exclusions); + } + + + public static class Bound + { + public final ByteBuffer value; + public final boolean inclusive; + + public Bound(ByteBuffer value, boolean inclusive) + { + this.value = value; + this.inclusive = inclusive; + } + + public boolean equals(Object other) + { + if (!(other instanceof Bound)) + return false; + + Bound o = (Bound) other; + return value.equals(o.value) && inclusive == o.inclusive; + } + } +}