Updated Branches: refs/heads/trunk 57516e082 -> d12a0d7b0
Secondary index support for collections patch by slebresne; reviewed by iamaleksey for CASSANDRA-4511 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d12a0d7b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d12a0d7b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d12a0d7b Branch: refs/heads/trunk Commit: d12a0d7b0299786bf1d0484f3770bae6a94cb0c9 Parents: 57516e0 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Nov 14 09:17:51 2013 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Dec 3 14:49:02 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/cql3/Cql.g | 4 + .../org/apache/cassandra/cql3/Relation.java | 15 ++- .../cql3/statements/CreateIndexStatement.java | 21 +++- .../cassandra/cql3/statements/Restriction.java | 111 +++++++++++++++++- .../cql3/statements/SelectStatement.java | 71 ++++++++++-- .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../apache/cassandra/db/IndexExpression.java | 19 +++- .../cassandra/db/filter/ExtendedFilter.java | 47 ++++++-- .../AbstractSimplePerColumnSecondaryIndex.java | 13 ++- .../db/index/SecondaryIndexSearcher.java | 2 +- .../db/index/composites/CompositesIndex.java | 50 ++++++++- .../CompositesIndexOnCollectionKey.java | 112 +++++++++++++++++++ .../CompositesIndexOnCollectionValue.java | 110 ++++++++++++++++++ .../db/index/composites/CompositesSearcher.java | 21 +++- 15 files changed, 566 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3bc50ac..08c3a67 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,7 @@ * User-defined types for CQL3 (CASSANDRA-5590) * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406) * Batch read from OTC's queue and cleanup (CASSANDRA-1632) + * Secondary index support for collections (CASSANDRA-4511) 2.0.4 http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g index 325d6f6..fb0054d 100644 --- a/src/java/org/apache/cassandra/cql3/Cql.g +++ b/src/java/org/apache/cassandra/cql3/Cql.g @@ -947,6 +947,8 @@ relation[List<Relation> clauses] { $clauses.add(new Relation(name, Relation.Type.IN, marker)); } | name=cident K_IN { Relation rel = Relation.createInRelation($name.id); } '(' ( f1=term { rel.addInValue(f1); } (',' fN=term { rel.addInValue(fN); } )* )? ')' { $clauses.add(rel); } + | name=cident K_CONTAINS { Relation.Type rt = Relation.Type.CONTAINS; } /* (K_KEY { rt = Relation.Type.CONTAINS_KEY })? */ + t=term { $clauses.add(new Relation(name, rt, t)); } | '(' relation[$clauses] ')' ; @@ -1045,6 +1047,7 @@ basic_unreserved_keyword returns [String str] | K_CUSTOM | K_TRIGGER | K_DISTINCT + | K_CONTAINS ) { $str = $k.text; } ; @@ -1101,6 +1104,7 @@ K_DESC: D E S C; K_ALLOW: A L L O W; K_FILTERING: F I L T E R I N G; K_IF: I F; +K_CONTAINS: C O N T A I N S; K_GRANT: G R A N T; K_ALL: A L L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/Relation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java index 15ed540..cfcdd54 100644 --- a/src/java/org/apache/cassandra/cql3/Relation.java +++ b/src/java/org/apache/cassandra/cql3/Relation.java @@ -35,7 +35,20 @@ public class Relation public static enum Type { - EQ, LT, LTE, GTE, GT, IN; + EQ, LT, LTE, GTE, GT, IN, CONTAINS, CONTAINS_KEY; + + public boolean allowsIndexQuery() + { + switch (this) + { + case EQ: + case CONTAINS: + case CONTAINS_KEY: + return true; + default: + return false; + } + } } private Relation(ColumnIdentifier entity, Type type, Term.Raw value, List<Term.Raw> inValues, boolean onToken) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java index b040121..ae6c15c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java @@ -18,10 +18,13 @@ package org.apache.cassandra.cql3.statements; import java.util.Collections; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableMap; + import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; @@ -91,9 +94,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement if (cfm.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR) throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.kind, columnName)); - if (cd.type.isCollection() && !isCustom) - throw new InvalidRequestException("Indexes on collections are no yet supported"); - if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents()) throw new InvalidRequestException(String.format("Cannot add secondary index to already primarily indexed column %s", columnName)); } @@ -108,11 +108,24 @@ public class CreateIndexStatement extends SchemaAlteringStatement return; if (isCustom) + { cd.setIndexType(IndexType.CUSTOM, Collections.singletonMap(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, indexClass)); + } else if (cfm.hasCompositeComparator()) - cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap()); + { + Map<String, String> options = Collections.emptyMap(); + // For now, we only allow indexing values for collections, but we could later allow + // to also index map keys, so we record that this is the values we index to make our + // lives easier then. + if (cd.type.isCollection()) + options = ImmutableMap.of("index_values", ""); + + cd.setIndexType(IndexType.COMPOSITES, options); + } else + { cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap()); + } cd.setIndexName(indexName); cfm.addDefaultIndexNames(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/Restriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java index 9119a9d..b6f900c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java +++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java @@ -37,8 +37,9 @@ public interface Restriction public boolean isSlice(); public boolean isEQ(); public boolean isIN(); + public boolean isContains(); - // Only supported for EQ and IN, but it's convenient to have here + // Not supported by Slice, but it's convenient to have here public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException; public static class EQ implements Restriction @@ -72,6 +73,11 @@ public interface Restriction return false; } + public boolean isContains() + { + return false; + } + public boolean isOnToken() { return onToken; @@ -107,6 +113,11 @@ public interface Restriction return false; } + public boolean isContains() + { + return false; + } + public boolean isIN() { return true; @@ -210,6 +221,11 @@ public interface Restriction return false; } + public boolean isContains() + { + return false; + } + public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException { throw new UnsupportedOperationException(); @@ -302,4 +318,97 @@ public interface Restriction onToken ? "*" : ""); } } + + // This holds both CONTAINS and CONTAINS_KEY restriction because we might want to have both of them. + public static class Contains implements Restriction + { + private List<Term> values; // for CONTAINS + private List<Term> keys; // for CONTAINS_KEY + + public boolean hasContains() + { + return values != null; + } + + public boolean hasContainsKey() + { + return keys != null; + } + + public void add(Term t, boolean isKey) + { + if (isKey) + addKey(t); + else + addValue(t); + } + + public void addValue(Term t) + { + if (values == null) + values = new ArrayList<>(); + values.add(t); + } + + public void addKey(Term t) + { + if (keys == null) + keys = new ArrayList<>(); + keys.add(t); + } + + public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException + { + if (values == null) + return Collections.emptyList(); + + List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size()); + for (Term value : values) + buffers.add(value.bindAndGet(variables)); + return buffers; + } + + public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException + { + if (keys == null) + return Collections.emptyList(); + + List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size()); + for (Term value : keys) + buffers.add(value.bindAndGet(variables)); + return buffers; + } + + public boolean isSlice() + { + return false; + } + + public boolean isEQ() + { + return false; + } + + public boolean isIN() + { + return false; + } + + public boolean isContains() + { + return true; + } + + public boolean isOnToken() + { + return false; + } + + + @Override + public String toString() + { + return String.format("CONTAINS(values=%s, keys=%s)", values, keys); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 344e926..62ebd21 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -756,15 +756,25 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache { if (slice.hasBound(b)) { - ByteBuffer value = slice.bound(b, variables); - if (value == null) - throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name)); - if (value.remaining() > 0xFFFF) - throw new InvalidRequestException("Index expression values may not be larger than 64K"); + ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables)); expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value)); } } } + else if (restriction.isContains()) + { + Restriction.Contains contains = (Restriction.Contains)restriction; + for (ByteBuffer value : contains.values(variables)) + { + validateIndexedValue(def, value); + expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value)); + } + for (ByteBuffer key : contains.keys(variables)) + { + validateIndexedValue(def, key); + expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key)); + } + } else { List<ByteBuffer> values = restriction.values(variables); @@ -772,17 +782,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (values.size() != 1) throw new InvalidRequestException("IN restrictions are not supported on indexed columns"); - ByteBuffer value = values.get(0); - if (value == null) - throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name)); - if (value.remaining() > 0xFFFF) - throw new InvalidRequestException("Index expression values may not be larger than 64K"); + ByteBuffer value = validateIndexedValue(def, values.get(0)); expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.EQ, value)); } } return expressions; } + private static ByteBuffer validateIndexedValue(ColumnDefinition def, ByteBuffer value) throws InvalidRequestException + { + if (value == null) + throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name)); + if (value.remaining() > 0xFFFF) + throw new InvalidRequestException("Index expression values may not be larger than 64K"); + return value; + } private Iterable<Column> columnsInOrder(final ColumnFamily cf, final List<ByteBuffer> variables) throws InvalidRequestException { @@ -1109,7 +1123,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache } stmt.restrictedColumns.add(def); - if (def.isIndexed() && rel.operator() == Relation.Type.EQ) + if (def.isIndexed() && rel.operator().allowsIndexQuery()) { hasQueriableIndex = true; if (def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN) @@ -1490,10 +1504,45 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache ((Restriction.Slice)restriction).setBound(def.name, newRel.operator(), t); } break; + case CONTAINS_KEY: + if (!(receiver.type instanceof MapType)) + throw new InvalidRequestException(String.format("Cannot use CONTAINS_KEY on non-map column %s", def.name)); + // Fallthrough on purpose + case CONTAINS: + { + if (!receiver.type.isCollection()) + throw new InvalidRequestException(String.format("Cannot use %s relation on non collection column %s", newRel.operator(), def.name)); + + if (restriction == null) + restriction = new Restriction.Contains(); + else if (!restriction.isContains()) + throw new InvalidRequestException(String.format("Collection column %s can only be restricted by CONTAINS or CONTAINS KEY", def.name)); + boolean isKey = newRel.operator() == Relation.Type.CONTAINS_KEY; + receiver = makeCollectionReceiver(receiver, isKey); + Term t = newRel.getValue().prepare(receiver); + ((Restriction.Contains)restriction).add(t, isKey); + } } return restriction; } + private static ColumnSpecification makeCollectionReceiver(ColumnSpecification collection, boolean isKey) + { + assert collection.type.isCollection(); + switch (((CollectionType)collection.type).kind) + { + case LIST: + assert !isKey; + return Lists.valueSpecOf(collection); + case SET: + assert !isKey; + return Sets.valueSpecOf(collection); + case MAP: + return isKey ? Maps.keySpecOf(collection) : Maps.valueSpecOf(collection); + } + throw new AssertionError(); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b00d220..396bbd3 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1697,7 +1697,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean removeDroppedColumns(data); - if (!filter.isSatisfiedBy(rawRow.key, data, null)) + if (!filter.isSatisfiedBy(rawRow.key, data, null, null)) continue; logger.trace("{} satisfies all filter expressions", data); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/IndexExpression.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IndexExpression.java b/src/java/org/apache/cassandra/db/IndexExpression.java index e08e41f..b57890a 100644 --- a/src/java/org/apache/cassandra/db/IndexExpression.java +++ b/src/java/org/apache/cassandra/db/IndexExpression.java @@ -40,7 +40,7 @@ public class IndexExpression public enum Operator { - EQ, GTE, GT, LTE, LT; + EQ, GTE, GT, LTE, LT, CONTAINS, CONTAINS_KEY; public static Operator findByOrdinal(int ordinal) { @@ -55,10 +55,27 @@ public class IndexExpression return LTE; case 4: return LT; + case 5: + return CONTAINS; + case 6: + return CONTAINS_KEY; default: throw new AssertionError(); } } + + public boolean allowsIndexQuery() + { + switch (this) + { + case EQ: + case CONTAINS: + case CONTAINS_KEY: + return true; + default: + return false; + } + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index 89d6683..e749871 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.filter; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; @@ -30,8 +31,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.ColumnNameBuilder; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; /** @@ -129,7 +129,7 @@ public abstract class ExtendedFilter * @return true if the provided data satisfies all the expressions from * the clause of this filter. */ - public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder); + public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement); public static boolean satisfies(int comparison, IndexExpression.Operator op) { @@ -279,10 +279,8 @@ public abstract class ExtendedFilter return pruned; } - public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder) + public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement) { - // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race - // where the index returned a row which doesn't have the primary column when we actually read it for (IndexExpression expression : clause) { ColumnDefinition def = data.metadata().getColumnDefinition(expression.column); @@ -301,6 +299,13 @@ public abstract class ExtendedFilter } else { + if (def.type.isCollection()) + { + if (!collectionSatisfies(def, data, builder, expression, collectionElement)) + return false; + continue; + } + dataValue = extractDataValue(def, rowKey.key, data, builder); validator = def.type; } @@ -315,6 +320,34 @@ public abstract class ExtendedFilter return true; } + private static boolean collectionSatisfies(ColumnDefinition def, ColumnFamily data, ColumnNameBuilder builder, IndexExpression expr, ByteBuffer collectionElement) + { + assert def.type.isCollection(); + + CollectionType type = (CollectionType)def.type; + builder = builder.copy().add(def.name.bytes); + + switch (type.kind) + { + case LIST: + assert collectionElement != null; + return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0; + case SET: + return data.getColumn(builder.add(expr.value).build()) != null; + case MAP: + if (expr.operator == IndexExpression.Operator.CONTAINS_KEY) + { + return data.getColumn(builder.add(expr.value).build()) != null; + } + else + { + assert collectionElement != null; + return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0; + } + } + throw new AssertionError(); + } + private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder) { switch (def.kind) @@ -359,7 +392,7 @@ public abstract class ExtendedFilter return data; } - public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder) + public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement) { return true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index 699b391..b7593ad 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -50,10 +50,21 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator); indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, indexedCfMetadata.cfName, - new LocalPartitioner(columnDef.type), + new LocalPartitioner(getIndexKeyComparator()), indexedCfMetadata); } + protected AbstractType<?> getIndexKeyComparator() + { + return columnDef.type; + } + + @Override + public DecoratedKey getIndexKeyFor(ByteBuffer value) + { + return new DecoratedKey(new LocalToken(getIndexKeyComparator(), value), value); + } + protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column); protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java index f18357b..a508a15 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java @@ -67,7 +67,7 @@ public abstract class SecondaryIndexSearcher continue; SecondaryIndex index = indexManager.getIndexForColumn(expression.column); - if (index == null || expression.operator != IndexExpression.Operator.EQ) + if (index == null || !expression.operator.allowsIndexQuery()) continue; int columns = index.getIndexCfs().getMeanColumns(); candidates.put(index, columns); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java index 3dea495..6d137ca 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java @@ -53,6 +53,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn public static CompositesIndex create(ColumnDefinition cfDef) { + if (cfDef.type.isCollection()) + { + switch (((CollectionType)cfDef.type).kind) + { + case LIST: + return new CompositesIndexOnCollectionValue(); + case SET: + return new CompositesIndexOnCollectionKey(); + case MAP: + return new CompositesIndexOnCollectionValue(); + } + } + switch (cfDef.kind) { case CLUSTERING_COLUMN: @@ -70,6 +83,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn // Check SecondaryIndex.getIndexComparator if you want to know why this is static public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef) { + if (cfDef.type.isCollection()) + { + switch (((CollectionType)cfDef.type).kind) + { + case LIST: + return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef); + case SET: + return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef); + case MAP: + return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef); + } + } + switch (cfDef.kind) { case CLUSTERING_COLUMN: @@ -127,10 +153,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn ColumnDefinition columnDef = columnDefs.iterator().next(); Map<String, String> options = new HashMap<String, String>(columnDef.getIndexOptions()); - // We take no options though we used to have one called "prefix_size", - // so skip it silently for backward compatibility sake. + // We used to have an option called "prefix_size" so skip it silently for backward compatibility sake. options.remove("prefix_size"); + if (columnDef.type.isCollection()) + options.remove("index_values"); + if (!options.isEmpty()) throw new ConfigurationException("Unknown options provided for COMPOSITES index: " + options.keySet()); } @@ -143,14 +171,30 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn public final ByteBuffer indexedKey; public final ColumnNameBuilder indexedEntryNameBuilder; + public final ByteBuffer indexedEntryCollectionKey; // may be null + + public IndexedEntry(DecoratedKey indexValue, + ByteBuffer indexEntry, + long timestamp, + ByteBuffer indexedKey, + ColumnNameBuilder indexedEntryNameBuilder) + { + this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryNameBuilder, null); + } - public IndexedEntry(DecoratedKey indexValue, ByteBuffer indexEntry, long timestamp, ByteBuffer indexedKey, ColumnNameBuilder indexedEntryNameBuilder) + public IndexedEntry(DecoratedKey indexValue, + ByteBuffer indexEntry, + long timestamp, + ByteBuffer indexedKey, + ColumnNameBuilder indexedEntryNameBuilder, + ByteBuffer indexedEntryCollectionKey) { this.indexValue = indexValue; this.indexEntry = indexEntry; this.timestamp = timestamp; this.indexedKey = indexedKey; this.indexedEntryNameBuilder = indexedEntryNameBuilder; + this.indexedEntryCollectionKey = indexedEntryCollectionKey; } public ByteBuffer indexedEntryStart() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java new file mode 100644 index 0000000..c2acfc9 --- /dev/null +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.index.composites; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnNameBuilder; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.LocalToken; + +/** + * Index on the collection element of the cell name of a collection. + * + * A cell indexed by this index will have the general form: + * ck_0 ... ck_n c_name [col_elt] : v + * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the + * collection element that we want to index (which may or may not be there depending + * on whether c_name is the collection we're indexing) and v the cell value. + * + * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have + * col_elt). The index entry will be: + * - row key will be col_elt value (getIndexedValue()). + * - cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell. + */ +public class CompositesIndexOnCollectionKey extends CompositesIndex +{ + public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef) + { + int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix + List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count); + List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents(); + types.add(SecondaryIndex.keyComparator); + for (int i = 0; i < count - 1; i++) + types.add(ckTypes.get(i)); + return CompositeType.getInstance(types); + } + + @Override + protected AbstractType<?> getIndexKeyComparator() + { + return ((CollectionType)columnDef.type).nameComparator(); + } + + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column) + { + CompositeType baseComparator = (CompositeType)baseCfs.getComparator(); + ByteBuffer[] components = baseComparator.split(column.name()); + return components[columnDef.position() + 1]; + } + + protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName) + { + int count = 1 + baseCfs.metadata.clusteringColumns().size(); + CompositeType baseComparator = (CompositeType)baseCfs.getComparator(); + ByteBuffer[] components = baseComparator.split(columnName); + CompositeType.Builder builder = getIndexComparator().builder(); + builder.add(rowKey); + for (int i = 0; i < count - 1; i++) + builder.add(components[i]); + return builder; + } + + public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry) + { + int count = 1 + baseCfs.metadata.clusteringColumns().size(); + ByteBuffer[] components = getIndexComparator().split(indexEntry.name()); + + ColumnNameBuilder builder = getBaseComparator().builder(); + for (int i = 0; i < count - 1; i++) + builder.add(components[i + 1]); + + return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder); + } + + @Override + public boolean indexes(ByteBuffer name) + { + // We index if the CQL3 column name is the one of the collection we index + ByteBuffer[] components = getBaseComparator().split(name); + AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef); + return components.length > columnDef.position() + && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0; + } + + public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + { + ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexValue.key).build(); + Column liveColumn = data.getColumn(bb); + return (liveColumn == null || liveColumn.isMarkedForDelete(now)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java new file mode 100644 index 0000000..f416d0e --- /dev/null +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.index.composites; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnNameBuilder; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.LocalToken; + +/** + * Index the value of a collection cell. + * + * This is a lot like an index on REGULAR, except that we also need to make + * the collection key part of the index entry so that: + * 1) we don't have to scan the whole collection at query time to know the + * entry is stale and if it still satisfies the query. + * 2) if a collection has multiple time the same value, we need one entry + * for each so that if we delete one of the value only we only delete the + * entry corresponding to that value. + */ +public class CompositesIndexOnCollectionValue extends CompositesIndex +{ + public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef) + { + int prefixSize = columnDef.position(); + List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 2); + types.add(SecondaryIndex.keyComparator); + for (int i = 0; i < prefixSize; i++) + types.add(((CompositeType)baseMetadata.comparator).types.get(i)); + types.add(((CollectionType)columnDef.type).nameComparator()); // collection key + return CompositeType.getInstance(types); + } + + @Override + protected AbstractType<?> getIndexKeyComparator() + { + return ((CollectionType)columnDef.type).valueComparator(); + } + + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column) + { + return column.value(); + } + + protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName) + { + int prefixSize = columnDef.position(); + CompositeType baseComparator = (CompositeType)baseCfs.getComparator(); + ByteBuffer[] components = baseComparator.split(columnName); + assert components.length == baseComparator.types.size(); + CompositeType.Builder builder = getIndexComparator().builder(); + builder.add(rowKey); + for (int i = 0; i < prefixSize; i++) + builder.add(components[i]); + builder.add(components[prefixSize + 1]); + return builder; + } + + public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry) + { + int prefixSize = columnDef.position(); + ByteBuffer[] components = getIndexComparator().split(indexEntry.name()); + CompositeType.Builder builder = getBaseComparator().builder(); + for (int i = 0; i < prefixSize; i++) + builder.add(components[i + 1]); + return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder, components[prefixSize + 1]); + } + + @Override + public boolean indexes(ByteBuffer name) + { + ByteBuffer[] components = getBaseComparator().split(name); + AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef); + return components.length > columnDef.position() + && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0; + } + + public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + { + ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexedEntryCollectionKey).build(); + Column liveColumn = data.getColumn(bb); + if (liveColumn == null || liveColumn.isMarkedForDelete(now)) + return true; + + ByteBuffer liveValue = liveColumn.value(); + return ((CollectionType)columnDef.type).valueComparator().compare(entry.indexValue.key, liveValue) != 0; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index 0d5d1a5..bcb0dd2 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -135,6 +135,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher */ DecoratedKey currentKey = null; ColumnFamily data = null; + ByteBuffer previousPrefix = null; while (true) { @@ -232,6 +233,16 @@ public class CompositesSearcher extends SecondaryIndexSearcher if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start)) continue; + // If we've record the previous prefix, it means we're dealing with an index on the collection value. In + // that case, we can have multiple index prefix for the same CQL3 row. In that case, we want to only add + // the CQL3 row once (because requesting the data multiple time would be inefficient but more importantly + // because we shouldn't count the columns multiple times with the lastCounted() call at the end of this + // method). + if (previousPrefix != null && previousPrefix.equals(start)) + continue; + else + previousPrefix = null; + logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name())); // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be @@ -248,9 +259,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher continue; } - assert newData != null : "An entry with not data should have been considered stale"; + assert newData != null : "An entry with no data should have been considered stale"; + + // We know the entry is not stale and so the entry satisfy the primary clause. So whether + // or not the data satisfies the other clauses, there will be no point to re-check the + // same CQL3 row if we run into another collection value entry for this row. + if (entry.indexedEntryCollectionKey != null) + previousPrefix = start; - if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder)) + if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder, entry.indexedEntryCollectionKey)) continue; if (data == null)