http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java index 07804d6..2210964 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java @@ -20,62 +20,28 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.util.*; -import org.apache.cassandra.io.util.*; -import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.obs.BitUtil; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.Pair; -public interface TokenTreeBuilder extends Iterable<Pair<Long, KeyOffsets>> -{ - final static int BLOCK_BYTES = 4096; - - final static int LEAF_ENTRY_TYPE_BYTES = Short.BYTES; - final static int TOKEN_OFFSET_BYTES = LEAF_ENTRY_TYPE_BYTES; - final static int LEAF_PARTITON_OFFSET_BYTES = Long.BYTES; - final static int LEAF_ROW_OFFSET_BYTES = Long.BYTES; - - final static int LEAF_PARTITON_OFFSET_PACKED_BYTES = Integer.BYTES; - final static int LEAF_ROW_OFFSET_PACKED_BYTES = Integer.BYTES; - final static int COLLISION_ENTRY_BYTES = LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES; - - final static int HEADER_INFO_BYTE_BYTES = Byte.BYTES; - final static int HEADER_TOKEN_COUNT_BYTES = Short.BYTES; - - final static int ROOT_HEADER_MAGIC_SIZE = Short.BYTES; - final static int ROOT_HEADER_TOKEN_COUNT_SIZE = Long.BYTES; +import com.carrotsearch.hppc.LongSet; - // Partitioner token size in bytes - final static int TOKEN_BYTES = Long.BYTES; - - // Leaf entry size in bytes, see {@class SimpleLeafEntry} for a full description - final static int LEAF_ENTRY_BYTES = LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES + LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES; - // Shared header size in bytes, see {@class AbstractTreeBuilder$Header} for a full description - final static int SHARED_HEADER_BYTES = HEADER_INFO_BYTE_BYTES + HEADER_TOKEN_COUNT_BYTES + 2 * TOKEN_BYTES; - // Block header size in bytes, see {@class AbstractTreeBuilder$RootHeader} - final static int BLOCK_HEADER_BYTES = BitUtil.nextHighestPowerOfTwo(SHARED_HEADER_BYTES + ROOT_HEADER_MAGIC_SIZE + ROOT_HEADER_TOKEN_COUNT_SIZE + 2 * TOKEN_BYTES); - - // Overflow trailer capacity is currently 8 overflow items. Each overflow item consists of two longs. - final static int OVERFLOW_TRAILER_CAPACITY = 8; - final static int OVERFLOW_TRAILER_BYTES = OVERFLOW_TRAILER_CAPACITY * COLLISION_ENTRY_BYTES;; - final static int TOKENS_PER_BLOCK = (TokenTreeBuilder.BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / LEAF_ENTRY_BYTES; - - final static int LEGACY_LEAF_ENTRY_BYTES = Short.BYTES + Short.BYTES + TOKEN_BYTES + Integer.BYTES; - final static int LEGACY_TOKEN_OFFSET_BYTES = 2 * Short.BYTES; - final static byte LAST_LEAF_SHIFT = 1; - - /** - * {@code Header} size in bytes. - */ - final byte ENTRY_TYPE_MASK = 0x03; - final short AB_MAGIC = 0x5A51; - final short AC_MAGIC = 0x7C63; +public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>> +{ + int BLOCK_BYTES = 4096; + int BLOCK_HEADER_BYTES = 64; + int OVERFLOW_TRAILER_BYTES = 64; + int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8; + int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16; + long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset + byte LAST_LEAF_SHIFT = 1; + byte SHARED_HEADER_BYTES = 19; + byte ENTRY_TYPE_MASK = 0x03; + short AB_MAGIC = 0x5A51; // note: ordinal positions are used here, do not change order enum EntryType { - SIMPLE, - FACTORED, - PACKED, - OVERFLOW; + SIMPLE, FACTORED, PACKED, OVERFLOW; public static EntryType of(int ordinal) { @@ -95,9 +61,9 @@ public interface TokenTreeBuilder extends Iterable<Pair<Long, KeyOffsets>> } } - void add(Long token, long partitionOffset, long rowOffset); - void add(SortedMap<Long, KeyOffsets> data); - void add(Iterator<Pair<Long, KeyOffsets>> data); + void add(Long token, long keyPosition); + void add(SortedMap<Long, LongSet> data); + void add(Iterator<Pair<Long, LongSet>> data); void add(TokenTreeBuilder ttb); boolean isEmpty();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java index a7b22f3..e55a806 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java @@ -19,14 +19,14 @@ package org.apache.cassandra.index.sasi.memory; import java.nio.ByteBuffer; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.index.sasi.utils.TypeUtil; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ public class IndexMemtable this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex); } - public long index(RowKey key, ByteBuffer value) + public long index(DecoratedKey key, ByteBuffer value) { if (value == null || value.remaining() == 0) return 0; @@ -55,7 +55,7 @@ public class IndexMemtable { logger.error("Can't add column {} to index for key: {}, value size {}, validator: {}.", index.columnIndex.getColumnName(), - index.columnIndex.keyValidator().getString(key.decoratedKey.getKey()), + index.columnIndex.keyValidator().getString(key.getKey()), FBUtilities.prettyPrintMemory(size), validator); return 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java index b4365dc..a2f2c0e 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java @@ -18,27 +18,28 @@ package org.apache.cassandra.index.sasi.memory; import java.io.IOException; -import java.util.*; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.index.sasi.disk.*; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.utils.AbstractIterator; import org.apache.cassandra.index.sasi.utils.CombinedValue; import org.apache.cassandra.index.sasi.utils.RangeIterator; +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; import com.google.common.collect.PeekingIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KeyRangeIterator extends RangeIterator<Long, Token> { private final DKIterator iterator; - public KeyRangeIterator(ConcurrentSkipListSet<RowKey> keys) + public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys) { - super((Long) keys.first().decoratedKey.getToken().getTokenValue(), (Long) keys.last().decoratedKey.getToken().getTokenValue(), keys.size()); + super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size()); this.iterator = new DKIterator(keys.iterator()); } @@ -51,8 +52,8 @@ public class KeyRangeIterator extends RangeIterator<Long, Token> { while (iterator.hasNext()) { - RowKey key = iterator.peek(); - if (Long.compare((Long) key.decoratedKey.getToken().getTokenValue(), nextToken) >= 0) + DecoratedKey key = iterator.peek(); + if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0) break; // consume smaller key @@ -63,16 +64,16 @@ public class KeyRangeIterator extends RangeIterator<Long, Token> public void close() throws IOException {} - private static class DKIterator extends AbstractIterator<RowKey> implements PeekingIterator<RowKey> + private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey> { - private final Iterator<RowKey> keys; + private final Iterator<DecoratedKey> keys; - public DKIterator(Iterator<RowKey> keys) + public DKIterator(Iterator<DecoratedKey> keys) { this.keys = keys; } - protected RowKey computeNext() + protected DecoratedKey computeNext() { return keys.hasNext() ? keys.next() : endOfData(); } @@ -80,21 +81,25 @@ public class KeyRangeIterator extends RangeIterator<Long, Token> private static class DKToken extends Token { - private final SortedSet<RowKey> keys; + private final SortedSet<DecoratedKey> keys; - public DKToken(RowKey key) + public DKToken(final DecoratedKey key) { - super((Long) key.decoratedKey.getToken().getTokenValue()); + super((long) key.getToken().getTokenValue()); - keys = new TreeSet<RowKey>(RowKey.COMPARATOR) + keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator) {{ add(key); }}; } - public KeyOffsets getOffsets() + public LongSet getOffsets() { - throw new IllegalStateException("DecoratedKey tokens are used in memtables and do not have on-disk offsets"); + LongSet offsets = new LongOpenHashSet(4); + for (DecoratedKey key : keys) + offsets.add((long) key.getToken().getTokenValue()); + + return offsets; } public void merge(CombinedValue<Long> other) @@ -111,14 +116,14 @@ public class KeyRangeIterator extends RangeIterator<Long, Token> } else { - for (RowKey key : o) + for (DecoratedKey key : o) keys.add(key); } } - public Iterator<RowKey> iterator() + public Iterator<DecoratedKey> iterator() { return keys.iterator(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java index bfba4cb..cc1eb3f 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java @@ -19,8 +19,8 @@ package org.apache.cassandra.index.sasi.memory; import java.nio.ByteBuffer; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeIterator; @@ -37,7 +37,7 @@ public abstract class MemIndex this.columnIndex = columnIndex; } - public abstract long add(RowKey key, ByteBuffer value); + public abstract long add(DecoratedKey key, ByteBuffer value); public abstract RangeIterator<Long, Token> search(Expression expression); public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java index 9c3562a..69b57d0 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java @@ -22,8 +22,8 @@ import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; @@ -34,7 +34,7 @@ public class SkipListMemIndex extends MemIndex { public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM - private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> index; + private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index; public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex) { @@ -42,14 +42,14 @@ public class SkipListMemIndex extends MemIndex index = new ConcurrentSkipListMap<>(columnIndex.getValidator()); } - public long add(RowKey key, ByteBuffer value) + public long add(DecoratedKey key, ByteBuffer value) { long overhead = CSLM_OVERHEAD; // DKs are shared - ConcurrentSkipListSet<RowKey> keys = index.get(value); + ConcurrentSkipListSet<DecoratedKey> keys = index.get(value); if (keys == null) { - ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>(); + ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator); keys = index.putIfAbsent(value, newKeys); if (keys == null) { @@ -68,7 +68,7 @@ public class SkipListMemIndex extends MemIndex ByteBuffer min = expression.lower == null ? null : expression.lower.value; ByteBuffer max = expression.upper == null ? null : expression.upper.value; - SortedMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> search; + SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search; if (min == null && max == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java index e1c273d..ca60ac5 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java @@ -23,8 +23,9 @@ import java.util.List; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.*; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.plan.Expression.Op; @@ -37,7 +38,7 @@ import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree; import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory; import com.googlecode.concurrenttrees.radix.node.Node; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; @@ -70,7 +71,7 @@ public class TrieMemIndex extends MemIndex } } - public long add(RowKey key, ByteBuffer value) + public long add(DecoratedKey key, ByteBuffer value) { AbstractAnalyzer analyzer = columnIndex.getAnalyzer(); analyzer.reset(value.duplicate()); @@ -84,7 +85,7 @@ public class TrieMemIndex extends MemIndex { logger.info("Can't add term of column {} to index for key: {}, term size {}, max allowed size {}, use analyzed = true (if not yet set) for that column.", columnIndex.getColumnName(), - keyValidator.getString(key.decoratedKey.getKey()), + keyValidator.getString(key.getKey()), FBUtilities.prettyPrintMemory(term.remaining()), FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE)); continue; @@ -112,13 +113,13 @@ public class TrieMemIndex extends MemIndex definition = column; } - public long add(String value, RowKey key) + public long add(String value, DecoratedKey key) { long overhead = CSLM_OVERHEAD; - ConcurrentSkipListSet<RowKey> keys = get(value); + ConcurrentSkipListSet<DecoratedKey> keys = get(value); if (keys == null) { - ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>(); + ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator); keys = putIfAbsent(value, newKeys); if (keys == null) { @@ -140,10 +141,10 @@ public class TrieMemIndex extends MemIndex { ByteBuffer prefix = expression.lower == null ? null : expression.lower.value; - Iterable<ConcurrentSkipListSet<RowKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix)); + Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix)); RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); - for (ConcurrentSkipListSet<RowKey> keys : search) + for (ConcurrentSkipListSet<DecoratedKey> keys : search) { if (!keys.isEmpty()) builder.add(new KeyRangeIterator(keys)); @@ -152,14 +153,14 @@ public class TrieMemIndex extends MemIndex return builder.build(); } - protected abstract ConcurrentSkipListSet<RowKey> get(String value); - protected abstract Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value); - protected abstract ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> key); + protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value); + protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value); + protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key); } protected static class ConcurrentPrefixTrie extends ConcurrentTrie { - private final ConcurrentRadixTree<ConcurrentSkipListSet<RowKey>> trie; + private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie; private ConcurrentPrefixTrie(ColumnDefinition column) { @@ -167,23 +168,23 @@ public class TrieMemIndex extends MemIndex trie = new ConcurrentRadixTree<>(NODE_FACTORY); } - public ConcurrentSkipListSet<RowKey> get(String value) + public ConcurrentSkipListSet<DecoratedKey> get(String value) { return trie.getValueForExactKey(value); } - public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys) + public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys) { return trie.putIfAbsent(value, newKeys); } - public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value) + public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value) { switch (operator) { case EQ: case MATCH: - ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value); + ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value); return keys == null ? Collections.emptyList() : Collections.singletonList(keys); case PREFIX: @@ -197,7 +198,7 @@ public class TrieMemIndex extends MemIndex protected static class ConcurrentSuffixTrie extends ConcurrentTrie { - private final ConcurrentSuffixTree<ConcurrentSkipListSet<RowKey>> trie; + private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie; private ConcurrentSuffixTrie(ColumnDefinition column) { @@ -205,23 +206,23 @@ public class TrieMemIndex extends MemIndex trie = new ConcurrentSuffixTree<>(NODE_FACTORY); } - public ConcurrentSkipListSet<RowKey> get(String value) + public ConcurrentSkipListSet<DecoratedKey> get(String value) { return trie.getValueForExactKey(value); } - public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys) + public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys) { return trie.putIfAbsent(value, newKeys); } - public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value) + public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value) { switch (operator) { case EQ: case MATCH: - ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value); + ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value); return keys == null ? Collections.emptyList() : Collections.singletonList(keys); case SUFFIX: http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java index af4e249..fa1181f 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java @@ -17,18 +17,15 @@ */ package org.apache.cassandra.index.sasi.plan; -import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sasi.SASIIndex; @@ -45,12 +42,10 @@ import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.Pair; public class QueryController { - private static final Logger logger = LoggerFactory.getLogger(QueryController.class); - private final long executionQuota; private final long executionStart; @@ -99,26 +94,22 @@ public class QueryController return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null; } - public UnfilteredRowIterator getPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, ReadExecutionController executionController) + + public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController) { if (key == null) throw new NullPointerException(); - try { - ClusteringIndexFilter filter; - if (clusterings == null) - filter = new ClusteringIndexSliceFilter(Slices.ALL, false); - else - filter = new ClusteringIndexNamesFilter(clusterings, false); - - SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata, + SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(), + cfs.metadata, command.nowInSec(), command.columnFilter(), - command.rowFilter(), + command.rowFilter().withoutExpressions(), DataLimits.NONE, key, - filter); + command.clusteringIndexFilter(key)); + return partition.queryMemtableAndDisk(cfs, executionController); } finally @@ -144,24 +135,20 @@ public class QueryController RangeIterator.Builder<Long, Token> builder = op == OperationType.OR ? RangeUnionIterator.<Long, Token>builder() - : RangeIntersectionIterator.<Long, org.apache.cassandra.index.sasi.disk.Token>builder(); + : RangeIntersectionIterator.<Long, Token>builder(); List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>(); for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet()) { - try (RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue())) - { - if (index == null) - continue; + @SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes + RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue()); - builder.add(index); - perIndexUnions.add(index); - } - catch (IOException ex) - { - logger.error("Failed to release index: ", ex); - } + if (index == null) + continue; + + builder.add(index); + perIndexUnions.add(index); } resources.put(expressions, perIndexUnions); http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java index ccb369c..4410756 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java @@ -19,19 +19,16 @@ package org.apache.cassandra.index.sasi.plan; import java.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.*; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.index.sasi.disk.*; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sasi.disk.Token; -import org.apache.cassandra.index.sasi.plan.Operation.*; -import org.apache.cassandra.utils.btree.*; +import org.apache.cassandra.index.sasi.plan.Operation.OperationType; +import org.apache.cassandra.exceptions.RequestTimeoutException; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.AbstractIterator; public class QueryPlan { @@ -71,16 +68,14 @@ public class QueryPlan return new ResultIterator(analyze(), controller, executionController); } - private static class ResultIterator implements UnfilteredPartitionIterator + private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator { private final AbstractBounds<PartitionPosition> keyRange; private final Operation operationTree; private final QueryController controller; private final ReadExecutionController executionController; - private Iterator<RowKey> currentKeys = null; - private UnfilteredRowIterator nextPartition = null; - private DecoratedKey lastPartitionKey = null; + private Iterator<DecoratedKey> currentKeys = null; public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController) { @@ -92,152 +87,53 @@ public class QueryPlan operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue()); } - public boolean hasNext() - { - return prepareNext(); - } - - public UnfilteredRowIterator next() - { - if (nextPartition == null) - prepareNext(); - - UnfilteredRowIterator toReturn = nextPartition; - nextPartition = null; - return toReturn; - } - - private boolean prepareNext() + protected UnfilteredRowIterator computeNext() { if (operationTree == null) - return false; - - if (nextPartition != null) - nextPartition.close(); + return endOfData(); for (;;) { if (currentKeys == null || !currentKeys.hasNext()) { if (!operationTree.hasNext()) - return false; + return endOfData(); Token token = operationTree.next(); currentKeys = token.iterator(); } - CFMetaData metadata = controller.metadata(); - BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(metadata.comparator); - // results have static clustering, the whole partition has to be read - boolean fetchWholePartition = false; - - while (true) + while (currentKeys.hasNext()) { - if (!currentKeys.hasNext()) - { - // No more keys for this token. - // If no clusterings were collected yet, exit this inner loop so the operation - // tree iterator can move on to the next token. - // If some clusterings were collected, build an iterator for those rows - // and return. - if ((clusterings.isEmpty() && !fetchWholePartition) || lastPartitionKey == null) - break; - - UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition); - // Prepare for next partition, reset partition key and clusterings - lastPartitionKey = null; - clusterings = BTreeSet.builder(metadata.comparator); - - if (partition.isEmpty()) - { - partition.close(); - continue; - } - - nextPartition = partition; - return true; - } - - RowKey fullKey = currentKeys.next(); - DecoratedKey key = fullKey.decoratedKey; + DecoratedKey key = currentKeys.next(); if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0) - return false; + return endOfData(); - if (lastPartitionKey != null && metadata.getKeyValidator().compare(lastPartitionKey.getKey(), key.getKey()) != 0) + try (UnfilteredRowIterator partition = controller.getPartition(key, executionController)) { - UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition); + Row staticRow = partition.staticRow(); + List<Unfiltered> clusters = new ArrayList<>(); - if (partition.isEmpty()) - partition.close(); - else + while (partition.hasNext()) { - nextPartition = partition; - return true; + Unfiltered row = partition.next(); + if (operationTree.satisfiedBy(row, staticRow, true)) + clusters.add(row); } - } - - lastPartitionKey = key; - - // We fetch whole partition for versions before AC and in case static column index is queried in AC - if (fullKey.clustering == null || fullKey.clustering.clustering().kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING) - fetchWholePartition = true; - else - clusterings.add(fullKey.clustering); + if (!clusters.isEmpty()) + return new PartitionIterator(partition, clusters); + } } } } - private UnfilteredRowIterator fetchPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, boolean fetchWholePartition) - { - if (fetchWholePartition) - clusterings = null; - - try (UnfilteredRowIterator partition = controller.getPartition(key, clusterings, executionController)) - { - Row staticRow = partition.staticRow(); - List<Unfiltered> clusters = new ArrayList<>(); - - while (partition.hasNext()) - { - Unfiltered row = partition.next(); - if (operationTree.satisfiedBy(row, staticRow, true)) - clusters.add(row); - } - - if (!clusters.isEmpty()) - return new PartitionIterator(partition, clusters); - else - return UnfilteredRowIterators.noRowsIterator(partition.metadata(), - partition.partitionKey(), - Rows.EMPTY_STATIC_ROW, - partition.partitionLevelDeletion(), - partition.isReverseOrder()); - } - } - - public void close() - { - if (nextPartition != null) - nextPartition.close(); - } - - public boolean isForThrift() - { - return controller.isForThrift(); - } - - public CFMetaData metadata() - { - return controller.metadata(); - } - private static class PartitionIterator extends AbstractUnfilteredRowIterator { private final Iterator<Unfiltered> rows; - public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> filteredRows) + public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content) { super(partition.metadata(), partition.partitionKey(), @@ -247,7 +143,7 @@ public class QueryPlan partition.isReverseOrder(), partition.stats()); - rows = filteredRows.iterator(); + rows = content.iterator(); } @Override @@ -256,5 +152,21 @@ public class QueryPlan return rows.hasNext() ? rows.next() : endOfData(); } } + + public boolean isForThrift() + { + return controller.isForThrift(); + } + + public CFMetaData metadata() + { + return controller.metadata(); + } + + public void close() + { + FileUtils.closeQuietly(operationTree); + controller.finish(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java index 35898aa..f0b6bac 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java @@ -46,11 +46,6 @@ public interface SSTableFlushObserver * * @param unfilteredCluster The unfiltered cluster being added to SSTable. */ - default void nextUnfilteredCluster(Unfiltered unfilteredCluster, long position) - { - nextUnfilteredCluster(unfilteredCluster); - } - void nextUnfilteredCluster(Unfiltered unfilteredCluster); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 40a84dc..56609b3 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -51,7 +51,8 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -63,7 +64,6 @@ import org.apache.cassandra.io.sstable.metadata.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.net.*; import org.apache.cassandra.schema.CachingParams; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.service.ActiveRepairService; @@ -1781,35 +1781,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } /** - * Reads Clustering Key from the data file of current sstable. - * - * @param rowPosition start position of given row in the data file - * @return Clustering of the row - * @throws IOException - */ - public Clustering clusteringAt(long rowPosition) throws IOException - { - Clustering clustering; - try (FileDataInput in = dfile.createReader(rowPosition)) - { - if (in.isEOF()) - return null; - - int flags = in.readUnsignedByte(); - int extendedFlags = UnfilteredSerializer.readExtendedFlags(in, flags); - boolean isStatic = UnfilteredSerializer.isStatic(extendedFlags); - - if (isStatic) - clustering = Clustering.STATIC_CLUSTERING; - else - // Since this is an internal call, we don't have to take care of protocol versions that use legacy layout - clustering = Clustering.serializer.deserialize(in, MessagingService.VERSION_30, header.clusteringTypes()); - } - - return clustering; - } - - /** * TODO: Move someplace reusable */ public abstract static class Operator http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 19e29a8..c3139a3 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -21,7 +21,9 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,9 +163,7 @@ public class BigTableWriter extends SSTableWriter return null; long startPosition = beforeAppend(key); - observers.forEach((o) -> { - o.startPartition(key, iwriter.indexFile.position()); - }); + observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position())); //Reuse the writer for each row columnIndexWriter.reset(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/utils/obs/BitUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/BitUtil.java b/src/java/org/apache/cassandra/utils/obs/BitUtil.java index c438d1b..e04de2b 100644 --- a/src/java/org/apache/cassandra/utils/obs/BitUtil.java +++ b/src/java/org/apache/cassandra/utils/obs/BitUtil.java @@ -20,7 +20,7 @@ package org.apache.cassandra.utils.obs; /** A variety of high efficiency bit twiddling routines. * @lucene.internal */ -public final class BitUtil +final class BitUtil { /** Returns the number of bits set in the long */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/data/legacy-sasi/on-disk-sa-int2.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sasi/on-disk-sa-int2.db b/test/data/legacy-sasi/on-disk-sa-int2.db deleted file mode 100644 index 71f662f..0000000 Binary files a/test/data/legacy-sasi/on-disk-sa-int2.db and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java index fc5afac..0b4e9e2 100644 --- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java @@ -76,7 +76,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import com.google.common.collect.Sets; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; @@ -93,7 +92,7 @@ public class SASIIndexTest PARTITIONER = Murmur3Partitioner.instance; } - private static final String KS_NAME = "sasi_index_test"; + private static final String KS_NAME = "sasi"; private static final String CF_NAME = "test_cf"; private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1"; private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2"; @@ -449,15 +448,9 @@ public class SASIIndexTest if (forceFlush) store.forceBlockingFlush(); - CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"), - CQLTester.row(UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Lady Gaga", "Poker Face"), - CQLTester.row(UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Lady Pank", "Zamki na piasku"), - CQLTester.row(UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Lady Pank", "Koncertowa")); - - CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT artist, title FROM %s.%s WHERE artist LIKE 'lady%%'"), - CQLTester.row("Lady Gaga", "Poker Face"), - CQLTester.row("Lady Pank", "Zamki na piasku"), - CQLTester.row("Lady Pank", "Koncertowa")); + final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"); + Assert.assertNotNull(results); + Assert.assertEquals(3, results.size()); } @Test @@ -671,7 +664,7 @@ public class SASIIndexTest add("key21"); }}; - Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); + Assert.assertEquals(expected, convert(uniqueKeys)); // now let's test a single equals condition @@ -697,7 +690,7 @@ public class SASIIndexTest add("key21"); }}; - Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); + Assert.assertEquals(expected, convert(uniqueKeys)); // now let's test something which is smaller than a single page uniqueKeys = getPaged(store, 4, @@ -711,7 +704,7 @@ public class SASIIndexTest add("key07"); }}; - Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); + Assert.assertEquals(expected, convert(uniqueKeys)); // the same but with the page size of 2 to test minimal pagination windows @@ -719,7 +712,7 @@ public class SASIIndexTest buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")), buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36))); - Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); + Assert.assertEquals(expected, convert(uniqueKeys)); // and last but not least, test age range query with pagination uniqueKeys = getPaged(store, 4, @@ -743,7 +736,7 @@ public class SASIIndexTest add("key21"); }}; - Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); + Assert.assertEquals(expected, convert(uniqueKeys)); Set<String> rows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java deleted file mode 100644 index 21ef070..0000000 --- a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.index.sasi.disk; - -import org.junit.Assert; -import org.junit.Test; - -public class KeyOffsetsTest -{ - @Test - public void testDuplicates() - { - KeyOffsets offsets = new KeyOffsets(); - long[] arr = new long[]{ 1, 2, 3, 4, 5, 6 }; - offsets.put(1, arr); - Assert.assertArrayEquals(offsets.get(1), arr); - offsets.put(1, arr); - Assert.assertArrayEquals(offsets.get(1), arr); - for (long l : arr) - offsets.put(1, l); - Assert.assertArrayEquals(offsets.get(1), arr); - - for (long l : arr) - offsets.put(2, l); - Assert.assertArrayEquals(offsets.get(2), arr); - offsets.put(2, arr); - Assert.assertArrayEquals(offsets.get(2), arr); - offsets.put(2, arr); - Assert.assertArrayEquals(offsets.get(2), arr); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java index b56cb4e..10dc7a8 100644 --- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java @@ -24,16 +24,13 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.cassandra.config.DatabaseDescriptor; -import com.carrotsearch.hppc.cursors.LongObjectCursor; import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.CombinedTerm; import org.apache.cassandra.index.sasi.utils.CombinedTermIterator; -import org.apache.cassandra.index.sasi.utils.KeyConverter; import org.apache.cassandra.index.sasi.utils.OnDiskIndexIterator; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.db.marshal.AbstractType; @@ -41,8 +38,13 @@ import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.utils.MurmurHash; import org.apache.cassandra.utils.Pair; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; + +import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -85,7 +87,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); // first check if we can find exact matches for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet()) @@ -93,13 +95,11 @@ public class OnDiskIndexTest if (UTF8Type.instance.getString(e.getKey()).equals("cat")) continue; // cat is embedded into scat, we'll test it in next section - Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), - convert(e.getValue()), - convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey())))); + Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), convert(e.getValue()), convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey())))); } // check that cat returns positions for scat & cat - Assert.assertEquals(convert(1L, 4L), convert(onDisk.search(expressionFor("cat")))); + Assert.assertEquals(convert(1, 4), convert(onDisk.search(expressionFor("cat")))); // random suffix queries Assert.assertEquals(convert(9, 10), convert(onDisk.search(expressionFor("ar")))); @@ -143,7 +143,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter()); for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet()) { @@ -224,14 +224,14 @@ public class OnDiskIndexTest OnDiskIndexBuilder iterTest = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX); for (int i = 0; i < iterCheckNums.size(); i++) - iterTest.add(iterCheckNums.get(i), keyAt((long) i), i, i + 5); + iterTest.add(iterCheckNums.get(i), keyAt((long) i), i); File iterIndex = File.createTempFile("sa-iter", ".db"); iterIndex.deleteOnExit(); iterTest.finish(iterIndex); - onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, KeyConverter.instance); + onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, new KeyConverter()); ByteBuffer number = Int32Type.instance.decompose(1); Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, OnDiskIndex.IteratorOrder.ASC, false))); @@ -283,7 +283,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), convert(onDisk.search(expressionFor("liz")))); Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionFor("a")))); @@ -315,14 +315,14 @@ public class OnDiskIndexTest final int numIterations = 100000; for (long i = 0; i < numIterations; i++) - builder.add(LongType.instance.decompose(start + i), keyAt(i), i, clusteringOffset(i)); + builder.add(LongType.instance.decompose(start + i), keyAt(i), i); File index = File.createTempFile("on-disk-sa-sparse", "db"); index.deleteOnExit(); builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter()); ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -343,9 +343,9 @@ public class OnDiskIndexTest if (upperInclusive) upperKey += 1; - Set<RowKey> actual = convert(rows); + Set<DecoratedKey> actual = convert(rows); for (long key = lowerKey; key < upperKey; key++) - Assert.assertTrue("key" + key + " wasn't found", actual.contains(new RowKey(keyAt(key), ck(clusteringOffset(key)), CLUSTERING_COMPARATOR))); + Assert.assertTrue("key" + key + " wasn't found", actual.contains(keyAt(key))); Assert.assertEquals((upperKey - lowerKey), actual.size()); } @@ -353,7 +353,7 @@ public class OnDiskIndexTest // let's also explicitly test whole range search RangeIterator<Long, Token> rows = onDisk.search(expressionFor(start, true, start + numIterations, true)); - Set<RowKey> actual = convert(rows); + Set<DecoratedKey> actual = convert(rows); Assert.assertEquals(numIterations, actual.size()); } @@ -380,7 +380,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); // test whole words first Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel")))); @@ -424,7 +424,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter()); Assert.assertEquals(convert(1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1)))); Assert.assertEquals(convert(1, 2, 4, 5, 7, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1, 8)))); @@ -439,16 +439,16 @@ public class OnDiskIndexTest final long lower = 0; final long upper = 100000; - OnDiskIndexBuilder builder = new OnDiskIndexBuilder(LongType.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE); + OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE); for (long i = lower; i <= upper; i++) - builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i)); + builder.add(LongType.instance.decompose(i), keyAt(i), i); File index = File.createTempFile("on-disk-sa-except-long-ranges", "db"); index.deleteOnExit(); builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter()); ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -503,10 +503,10 @@ public class OnDiskIndexTest private int validateExclusions(OnDiskIndex sa, long lower, long upper, Set<Long> exclusions, boolean checkCount) { int count = 0; - for (RowKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions)))) + for (DecoratedKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions)))) { - long keyId = LongType.instance.compose(key.decoratedKey.getKey()); - Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(keyId)); + String keyId = UTF8Type.instance.getString(key.getKey()).split("key")[1]; + Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(Long.valueOf(keyId))); count++; } @@ -519,49 +519,40 @@ public class OnDiskIndexTest @Test public void testDescriptor() throws Exception { - final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>() + final Map<ByteBuffer, Pair<DecoratedKey, Long>> data = new HashMap<ByteBuffer, Pair<DecoratedKey, Long>>() {{ - put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(clusteringOffset(1L)), CLUSTERING_COMPARATOR) , 1L)); + put(Int32Type.instance.decompose(5), Pair.create(keyAt(1L), 1L)); }}; OnDiskIndexBuilder builder1 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX); - for (Map.Entry<ByteBuffer, Pair<RowKey, Long>> e : data.entrySet()) + OnDiskIndexBuilder builder2 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX); + for (Map.Entry<ByteBuffer, Pair<DecoratedKey, Long>> e : data.entrySet()) { - DecoratedKey key = e.getValue().left.decoratedKey; + DecoratedKey key = e.getValue().left; Long position = e.getValue().right; - builder1.add(e.getKey(), key, position, clusteringOffset(position)); + builder1.add(e.getKey(), key, position); + builder2.add(e.getKey(), key, position); } File index1 = File.createTempFile("on-disk-sa-int", "db"); - + File index2 = File.createTempFile("on-disk-sa-int2", "db"); index1.deleteOnExit(); + index2.deleteOnExit(); builder1.finish(index1); - OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, KeyConverter.instance); - ByteBuffer number = Int32Type.instance.decompose(5); - Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number)))); - Assert.assertEquals(onDisk1.descriptor.version, Descriptor.CURRENT_VERSION); - } - + builder2.finish(new Descriptor(Descriptor.VERSION_AA), index2); - static final String DATA_DIR = "test/data/legacy-sasi/"; - - @Test - public void testLegacyDescriptor() throws Exception - { - final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>() - {{ - put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(KeyOffsets.NO_OFFSET), CLUSTERING_COMPARATOR) , 1L)); - }}; - - File index2 = new File(DATA_DIR + "on-disk-sa-int2.db"); - OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, KeyConverter.instance); + OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, new KeyConverter()); + OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, new KeyConverter()); ByteBuffer number = Int32Type.instance.decompose(5); + + Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number)))); Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk2.search(expressionFor(Operator.EQ, Int32Type.instance, number)))); - Assert.assertEquals(onDisk2.descriptor.version, Descriptor.VERSION_AA); + Assert.assertEquals(onDisk1.descriptor.version.version, Descriptor.CURRENT_VERSION); + Assert.assertEquals(onDisk2.descriptor.version.version, Descriptor.VERSION_AA); } @Test @@ -583,7 +574,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter()); OnDiskIndex.OnDiskSuperBlock superBlock = onDisk.dataLevel.getSuperBlock(0); Iterator<Token> iter = superBlock.iterator(); @@ -604,14 +595,14 @@ public class OnDiskIndexTest { OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE); for (long i = 0; i < 100000; i++) - builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i)); + builder.add(LongType.instance.decompose(i), keyAt(i), i); File index = File.createTempFile("on-disk-sa-multi-superblock-match", ".db"); index.deleteOnExit(); builder.finish(index); - OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, KeyConverter.instance); + OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, new KeyConverter()); testSearchRangeWithSuperBlocks(onDiskIndex, 0, 500); testSearchRangeWithSuperBlocks(onDiskIndex, 300, 93456); @@ -626,9 +617,9 @@ public class OnDiskIndexTest } } - public void putAll(SortedMap<Long, KeyOffsets> offsets, TokenTreeBuilder ttb) + public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb) { - for (Pair<Long, KeyOffsets> entry : ttb) + for (Pair<Long, LongSet> entry : ttb) offsets.put(entry.left, entry.right); } @@ -638,26 +629,26 @@ public class OnDiskIndexTest OnDiskIndexBuilder builderA = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX); OnDiskIndexBuilder builderB = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX); - TreeMap<Long, TreeMap<Long, KeyOffsets>> expected = new TreeMap<>(); + TreeMap<Long, TreeMap<Long, LongSet>> expected = new TreeMap<>(); for (long i = 0; i <= 100; i++) { - TreeMap<Long, KeyOffsets> offsets = expected.get(i); + TreeMap<Long, LongSet> offsets = expected.get(i); if (offsets == null) expected.put(i, (offsets = new TreeMap<>())); - builderA.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i)); + builderA.add(LongType.instance.decompose(i), keyAt(i), i); putAll(offsets, keyBuilder(i)); } for (long i = 50; i < 100; i++) { - TreeMap<Long, KeyOffsets> offsets = expected.get(i); + TreeMap<Long, LongSet> offsets = expected.get(i); if (offsets == null) expected.put(i, (offsets = new TreeMap<>())); long position = 100L + i; - builderB.add(LongType.instance.decompose(i), keyAt(position), position, clusteringOffset(position)); + builderB.add(LongType.instance.decompose(i), keyAt(position), position); putAll(offsets, keyBuilder(100L + i)); } @@ -670,19 +661,19 @@ public class OnDiskIndexTest builderA.finish(indexA); builderB.finish(indexB); - OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, KeyConverter.instance); - OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, KeyConverter.instance); + OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, new KeyConverter()); + OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, new KeyConverter()); RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union = OnDiskIndexIterator.union(a, b); - TreeMap<Long, TreeMap<Long, KeyOffsets>> actual = new TreeMap<>(); + TreeMap<Long, TreeMap<Long, LongSet>> actual = new TreeMap<>(); while (union.hasNext()) { CombinedTerm term = union.next(); Long composedTerm = LongType.instance.compose(term.getTerm()); - TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm); + TreeMap<Long, LongSet> offsets = actual.get(composedTerm); if (offsets == null) actual.put(composedTerm, (offsets = new TreeMap<>())); @@ -697,7 +688,7 @@ public class OnDiskIndexTest OnDiskIndexBuilder combined = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX); combined.finish(Pair.create(keyAt(0).getKey(), keyAt(100).getKey()), indexC, new CombinedTermIterator(a, b)); - OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, KeyConverter.instance); + OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, new KeyConverter()); union = OnDiskIndexIterator.union(c); actual.clear(); @@ -707,7 +698,7 @@ public class OnDiskIndexTest Long composedTerm = LongType.instance.compose(term.getTerm()); - TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm); + TreeMap<Long, LongSet> offsets = actual.get(composedTerm); if (offsets == null) actual.put(composedTerm, (offsets = new TreeMap<>())); @@ -747,7 +738,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); // check that lady% return lady gaga (1) and lady pank (3) but not lady of bells(2) Assert.assertEquals(convert(1, 3), convert(onDisk.search(expressionFor("lady", Operator.LIKE_PREFIX)))); @@ -771,7 +762,7 @@ public class OnDiskIndexTest while (tokens.hasNext()) { Token token = tokens.next(); - Iterator<RowKey> keys = token.iterator(); + Iterator<DecoratedKey> keys = token.iterator(); // each of the values should have exactly a single key Assert.assertTrue(keys.hasNext()); @@ -780,7 +771,7 @@ public class OnDiskIndexTest // and it's last should always smaller than current if (lastToken != null) - Assert.assertTrue("last should be less than current", lastToken < token.get()); + Assert.assertTrue("last should be less than current", lastToken.compareTo(token.get()) < 0); lastToken = token.get(); keyCount++; @@ -789,84 +780,61 @@ public class OnDiskIndexTest Assert.assertEquals(end - start, keyCount); } - private static DecoratedKey keyAt(long partitionOffset) - { - return KeyConverter.dk(partitionOffset); - } - - private static Clustering ck(long rowOffset) - { - return KeyConverter.ck(rowOffset); - } - - private TokenTreeBuilder keyBuilder(long... offsets) + private static DecoratedKey keyAt(long rawKey) { - TokenTreeBuilder builder = new DynamicTokenTreeBuilder(); - - for (final long pkOffset : offsets) - { - DecoratedKey k = keyAt(pkOffset); - builder.add((Long) k.getToken().getTokenValue(), pkOffset, clusteringOffset(pkOffset)); - } - - return builder.finish(); + ByteBuffer key = ByteBuffer.wrap(("key" + rawKey).getBytes()); + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(MurmurHash.hash2_64(key, key.position(), key.remaining(), 0)), key); } - private static long clusteringOffset(long offset) - { - return offset + 100; - } - - private TokenTreeBuilder keyBuilder(Pair<Long, Long>... offsets) + private static TokenTreeBuilder keyBuilder(Long... keys) { TokenTreeBuilder builder = new DynamicTokenTreeBuilder(); - for (final Pair<Long,Long> key : offsets) + for (final Long key : keys) { - DecoratedKey k = keyAt(key.left); - builder.add((Long) k.getToken().getTokenValue(), key.left, key.right); + DecoratedKey dk = keyAt(key); + builder.add((Long) dk.getToken().getTokenValue(), key); } return builder.finish(); } - private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(BytesType.instance); - - private static Set<RowKey> convert(TokenTreeBuilder offsets) + private static Set<DecoratedKey> convert(TokenTreeBuilder offsets) { - Set<RowKey> result = new HashSet<>(); + Set<DecoratedKey> result = new HashSet<>(); - Iterator<Pair<Long, KeyOffsets>> offsetIter = offsets.iterator(); + Iterator<Pair<Long, LongSet>> offsetIter = offsets.iterator(); while (offsetIter.hasNext()) { - Pair<Long, KeyOffsets> pair = offsetIter.next(); + LongSet v = offsetIter.next().right; - for (LongObjectCursor<long[]> cursor : pair.right) - for (long l : cursor.value) - result.add(new RowKey(keyAt(cursor.key), ck(l), CLUSTERING_COMPARATOR)); + for (LongCursor offset : v) + result.add(keyAt(offset.value)); } return result; } - private static Set<RowKey> convert(long... keyOffsets) + private static Set<DecoratedKey> convert(long... keyOffsets) { - Set<RowKey> result = new HashSet<>(); - for (final long offset : keyOffsets) - result.add(new RowKey(keyAt(offset), ck(clusteringOffset(offset)), CLUSTERING_COMPARATOR)); + Set<DecoratedKey> result = new HashSet<>(); + for (long offset : keyOffsets) + result.add(keyAt(offset)); return result; } - private static Set<RowKey> convert(RangeIterator<Long, Token> results) + private static Set<DecoratedKey> convert(RangeIterator<Long, Token> results) { if (results == null) return Collections.emptySet(); - Set<RowKey> keys = new TreeSet<>(); + Set<DecoratedKey> keys = new TreeSet<>(DecoratedKey.comparator); while (results.hasNext()) - for (RowKey key: results.next()) + { + for (DecoratedKey key : results.next()) keys.add(key); + } return keys; } @@ -940,11 +908,19 @@ public class OnDiskIndexTest private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens) { - for (Pair<Long, KeyOffsets> token : tokens) + for (Pair<Long, LongSet> token : tokens) + { + for (long position : token.right.toArray()) + builder.add(term, keyAt(position), position); + } + } + + private static class KeyConverter implements Function<Long, DecoratedKey> + { + @Override + public DecoratedKey apply(Long offset) { - for (LongObjectCursor<long[]> cursor : token.right) - for (long clusteringOffset : cursor.value) - builder.add(term, keyAt(cursor.key), cursor.key, clusteringOffset); + return keyAt(offset); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java index 61e4d67..f19d962 100644 --- a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java @@ -22,14 +22,11 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Supplier; -import com.carrotsearch.hppc.cursors.LongObjectCursor; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; @@ -38,9 +35,6 @@ import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.index.sasi.KeyFetcher; import org.apache.cassandra.index.sasi.SASIIndex; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.db.marshal.Int32Type; @@ -76,8 +70,6 @@ public class PerSSTableIndexWriterTest extends SchemaLoader Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME)))); } - private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance); - @Test public void testPartialIndexWrites() throws Exception { @@ -94,20 +86,19 @@ public class PerSSTableIndexWriterTest extends SchemaLoader Descriptor descriptor = Descriptor.fromFilename(cfs.getSSTablePath(directory)); PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) sasi.getFlushObserver(descriptor, OperationType.FLUSH); - SortedMap<RowKey, Row> expectedKeys = new TreeMap<>(); + SortedMap<DecoratedKey, Row> expectedKeys = new TreeMap<>(DecoratedKey.comparator); for (int i = 0; i < maxKeys; i++) { ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i)); - Clustering clustering = Clustering.make(ByteBufferUtil.bytes(i * 1L)); - expectedKeys.put(new RowKey(cfs.metadata.partitioner.decorateKey(key), clustering, CLUSTERING_COMPARATOR), - BTreeRow.singleCellRow(clustering, + expectedKeys.put(cfs.metadata.partitioner.decorateKey(key), + BTreeRow.singleCellRow(Clustering.EMPTY, BufferCell.live(column, timestamp, Int32Type.instance.decompose(i)))); } indexWriter.begin(); - Iterator<Map.Entry<RowKey, Row>> keyIterator = expectedKeys.entrySet().iterator(); + Iterator<Map.Entry<DecoratedKey, Row>> keyIterator = expectedKeys.entrySet().iterator(); long position = 0; Set<String> segments = new HashSet<>(); @@ -119,11 +110,10 @@ public class PerSSTableIndexWriterTest extends SchemaLoader if (!keyIterator.hasNext()) break outer; - Map.Entry<RowKey, Row> key = keyIterator.next(); + Map.Entry<DecoratedKey, Row> key = keyIterator.next(); - indexWriter.startPartition(key.getKey().decoratedKey, position); - indexWriter.nextUnfilteredCluster(key.getValue(), position); - position++; + indexWriter.startPartition(key.getKey(), position++); + indexWriter.nextUnfilteredCluster(key.getValue()); } PerSSTableIndexWriter.Index index = indexWriter.getIndex(column); @@ -144,12 +134,15 @@ public class PerSSTableIndexWriterTest extends SchemaLoader for (String segment : segments) Assert.assertFalse(new File(segment).exists()); - OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, new FakeKeyFetcher(cfs, keyFormat)); + OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, keyPosition -> { + ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition)); + return cfs.metadata.partitioner.decorateKey(key); + }); Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), ByteBufferUtil.bytes(String.format(keyFormat, 0)))); Assert.assertEquals(0, UTF8Type.instance.compare(index.maxKey(), ByteBufferUtil.bytes(String.format(keyFormat, maxKeys - 1)))); - Set<RowKey> actualKeys = new HashSet<>(); + Set<DecoratedKey> actualKeys = new HashSet<>(); int count = 0; for (OnDiskIndex.DataTerm term : index) { @@ -157,7 +150,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader while (tokens.hasNext()) { - for (RowKey key : tokens.next()) + for (DecoratedKey key : tokens.next()) actualKeys.add(key); } @@ -165,8 +158,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader } Assert.assertEquals(expectedKeys.size(), actualKeys.size()); - for (RowKey key : expectedKeys.keySet()) - Assert.assertTrue("Key was not present : " + key, actualKeys.contains(key)); + for (DecoratedKey key : expectedKeys.keySet()) + Assert.assertTrue(actualKeys.contains(key)); FileUtils.closeQuietly(index); } @@ -190,14 +183,11 @@ public class PerSSTableIndexWriterTest extends SchemaLoader indexWriter.begin(); indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex())); - populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, KeyOffsets>() + populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, Set<Integer>>() {{ - put(now, new KeyOffsets() {{ put(0, 0); put(1, 1); }}); - put(now + 1, new KeyOffsets() {{ put(2, 2); put(3, 3); }}); - put(now + 2, new KeyOffsets() {{ - put(4, 4); put(5, 5); put(6, 6); - put(7, 7); put(8, 8); put(9, 9); - }}); + put(now, new HashSet<>(Arrays.asList(0, 1))); + put(now + 1, new HashSet<>(Arrays.asList(2, 3))); + put(now + 2, new HashSet<>(Arrays.asList(4, 5, 6, 7, 8, 9))); }}); Callable<OnDiskIndex> segmentBuilder = indexWriter.getIndex(column).scheduleSegmentFlush(false); @@ -207,21 +197,15 @@ public class PerSSTableIndexWriterTest extends SchemaLoader PerSSTableIndexWriter.Index index = indexWriter.getIndex(column); Random random = ThreadLocalRandom.current(); - Supplier<KeyOffsets> offsetSupplier = () -> new KeyOffsets() {{ - put(random.nextInt(), random.nextInt()); - put(random.nextInt(), random.nextInt()); - put(random.nextInt(), random.nextInt()); - }}; - Set<String> segments = new HashSet<>(); // now let's test multiple correct segments with yield incorrect final segment for (int i = 0; i < 3; i++) { - populateSegment(cfs.metadata, index, new HashMap<Long, KeyOffsets>() + populateSegment(cfs.metadata, index, new HashMap<Long, Set<Integer>>() {{ - put(now, offsetSupplier.get()); - put(now + 1, offsetSupplier.get()); - put(now + 2, offsetSupplier.get()); + put(now, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt()))); + put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt()))); + put(now + 2, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt()))); }}); try @@ -252,56 +236,16 @@ public class PerSSTableIndexWriterTest extends SchemaLoader Assert.assertFalse(new File(index.outputFile).exists()); } - private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, KeyOffsets> data) + private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, Set<Integer>> data) { - for (Map.Entry<Long, KeyOffsets> value : data.entrySet()) + for (Map.Entry<Long, Set<Integer>> value : data.entrySet()) { ByteBuffer term = LongType.instance.decompose(value.getKey()); - for (LongObjectCursor<long[]> cursor : value.getValue()) + for (Integer keyPos : value.getValue()) { - ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", cursor.key)); - for (long rowOffset : cursor.value) - { - index.add(term, - metadata.partitioner.decorateKey(key), - ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1), - ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)); - } + ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", keyPos)); + index.add(term, metadata.partitioner.decorateKey(key), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)); } } } - - private final class FakeKeyFetcher implements KeyFetcher - { - private final ColumnFamilyStore cfs; - private final String keyFormat; - - public FakeKeyFetcher(ColumnFamilyStore cfs, String keyFormat) - { - this.cfs = cfs; - this.keyFormat = keyFormat; - } - - public DecoratedKey getPartitionKey(long keyPosition) - { - ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition)); - return cfs.metadata.partitioner.decorateKey(key); - } - - public Clustering getClustering(long offset) - { - return Clustering.make(ByteBufferUtil.bytes(offset)); - } - - public RowKey getRowKey(long partitionOffset, long rowOffset) - { - return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), CLUSTERING_COMPARATOR); - } - } - - public IPartitioner getPartitioner() - { - return Murmur3Partitioner.instance; - } - }