Add query time validation method on Index Patch by Sam Tunnicliffe; reviewed by Andrés de la Peña for CASSANDRA-11043
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cfbc31b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cfbc31b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cfbc31b Branch: refs/heads/trunk Commit: 9cfbc31bc29685bd60355a823e0cf261a89858f0 Parents: ac7e279 Author: Sam Tunnicliffe <s...@beobal.com> Authored: Mon Feb 8 15:22:41 2016 +0000 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Mon Feb 15 13:08:00 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/SelectStatement.java | 22 +++++- .../org/apache/cassandra/db/ReadCommand.java | 13 ++++ src/java/org/apache/cassandra/index/Index.java | 24 ++++-- .../service/pager/RangeSliceQueryPager.java | 8 +- .../cassandra/thrift/CassandraServer.java | 8 ++ .../org/apache/cassandra/cql3/CQLTester.java | 32 +++++--- .../apache/cassandra/index/CustomIndexTest.java | 82 +++++++++++++------- 8 files changed, 144 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a7669bb..52fc3ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.4 + * Add query time validation method on Index (CASSANDRA-11043) * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128) * Properly handle hinted handoff after topology changes (CASSANDRA-5902) * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index e84ab28..931813a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -46,6 +46,7 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.index.Index; import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.ClientState; @@ -490,9 +491,24 @@ public class SelectStatement implements CQLStatement // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options); - return keyBounds == null - ? ReadQuery.EMPTY - : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), Optional.empty()); + if (keyBounds == null) + return ReadQuery.EMPTY; + + PartitionRangeReadCommand command = new PartitionRangeReadCommand(cfm, + nowInSec, + queriedColumns, + rowFilter, + limit, + new DataRange(keyBounds, clusteringIndexFilter), + Optional.empty()); + // If there's a secondary index that the command can use, have it validate + // the request parameters. Note that as a side effect, if a viable Index is + // identified by the CFS's index manager, it will be cached in the command + // and serialized during distribution to replicas in order to avoid performing + // further lookups. + command.maybeValidateIndex(); + + return command; } private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 97c3d07..c792a5a 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -321,6 +321,19 @@ public abstract class ReadCommand implements ReadQuery } /** + * If the index manager for the CFS determines that there's an applicable + * 2i that can be used to execute this command, call its (optional) + * validation method to check that nothing in this command's parameters + * violates the implementation specific validation rules. + */ + public void maybeValidateIndex() + { + Index index = getIndex(Keyspace.openAndGetStore(metadata)); + if (null != index) + index.validate(this); + } + + /** * Executes this command on the local host. * * @param orderGroup the operation group spanning this command http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 71dcfc9..ab6665d 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -395,6 +395,24 @@ public interface Index */ /** + * Used to validate the various parameters of a supplied {@code}ReadCommand{@code}, + * this is called prior to execution. In theory, any command instance may be checked + * by any {@code}Index{@code} instance, but in practice the index will be the one + * returned by a call to the {@code}getIndex(ColumnFamilyStore cfs){@code} method on + * the supplied command. + * + * Custom index implementations should perform any validation of query expressions here and throw a meaningful + * InvalidRequestException when any expression or other parameter is invalid. + * + * @param command a ReadCommand whose parameters are to be verified + * @throws InvalidRequestException if the details of the command fail to meet the + * index's validation rules + */ + default void validate(ReadCommand command) throws InvalidRequestException + { + } + + /** * Return a function which performs post processing on the results of a partition range read command. * In future, this may be used as a generalized mechanism for transforming results on the coordinator prior * to returning them to the caller. @@ -412,15 +430,11 @@ public interface Index /** * Factory method for query time search helper. - * Custom index implementations should perform any validation of query expressions here and throw a meaningful - * InvalidRequestException when any expression is invalid. * * @param command the read command being executed * @return an Searcher with which to perform the supplied command - * @throws InvalidRequestException if the command's expressions are invalid according to the - * specific syntax supported by the index implementation. */ - public Searcher searcherFor(ReadCommand command) throws InvalidRequestException; + public Searcher searcherFor(ReadCommand command); /** * Performs the actual index lookup during execution of a ReadCommand. http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index fd35b29..6ad8649 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -27,6 +27,8 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.schema.IndexMetadata; /** * Pages a RangeSliceCommand whose predicate is a slice query. @@ -89,9 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager } } - // it won't hurt for the next page command to query the index manager - // again to check for an applicable index, so don't supply one here - return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty()); + Index index = command.getIndex(Keyspace.openAndGetStore(command.metadata())); + Optional<IndexMetadata> indexMetadata = index != null ? Optional.of(index.getIndexMetadata()) : Optional.empty(); + return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, indexMetadata); } protected void recordLast(DecoratedKey key, Row last) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index ee86f9d..ee3bfa3 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -48,6 +48,7 @@ import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.index.Index; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.metrics.ClientMetrics; @@ -1715,6 +1716,13 @@ public class CassandraServer implements Cassandra.Iface limits, new DataRange(bounds, filter), Optional.empty()); + // If there's a secondary index that the command can use, have it validate + // the request parameters. Note that as a side effect, if a viable Index is + // identified by the CFS's index manager, it will be cached in the command + // and serialized during distribution to replicas in order to avoid performing + // further lookups. + cmd.maybeValidateIndex(); + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) { return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 71bc238..43de101 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -31,18 +31,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import com.datastax.driver.core.*; -import com.datastax.driver.core.ResultSet; - import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - import org.junit.*; - -import com.datastax.driver.core.Cluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.ResultSet; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; @@ -56,7 +53,8 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.service.ClientState; @@ -66,6 +64,7 @@ import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; + import static junit.framework.Assert.assertNotNull; /** @@ -1005,15 +1004,30 @@ public abstract class CQLTester protected void assertInvalidThrowMessage(String errorMessage, Class<? extends Throwable> exception, String query, Object... values) throws Throwable { + assertInvalidThrowMessage(Integer.MIN_VALUE, errorMessage, exception, query, values); + } + + // if a protocol version > Integer.MIN_VALUE is supplied, executes + // the query via the java driver, mimicking a real client. + protected void assertInvalidThrowMessage(int protocolVersion, + String errorMessage, + Class<? extends Throwable> exception, + String query, + Object... values) throws Throwable + { try { - execute(query, values); + if (protocolVersion == Integer.MIN_VALUE) + execute(query, values); + else + executeNet(protocolVersion, query, values); + String q = USE_PREPARED_VALUES ? query + " (values: " + formatAllValues(values) + ")" : replaceValues(query, values); Assert.fail("Query should be invalid but no error was thrown. Query is: " + q); } - catch (CassandraException e) + catch (Exception e) { if (exception != null && !exception.isAssignableFrom(e.getClass())) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cfbc31b/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index 3bfb6a5..bc432ca 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.junit.Test; +import com.datastax.driver.core.exceptions.QueryValidationException; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.CQLTester; @@ -27,6 +28,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.Util.throwAssert; @@ -321,8 +323,10 @@ public class CustomIndexTest extends CQLTester createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", StubIndex.class.getName())); - assertInvalidMessage(String.format(IndexRestrictions.INDEX_NOT_FOUND, "no_such_index", keyspace(), currentTable()), - "SELECT * FROM %s WHERE expr(no_such_index, 'foo bar baz ')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + String.format(IndexRestrictions.INDEX_NOT_FOUND, "no_such_index", keyspace(), currentTable()), + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(no_such_index, 'foo bar baz ')"); // simple case assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"), row); @@ -330,16 +334,22 @@ public class CustomIndexTest extends CQLTester assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, $$foo \" ~~~ bar Baz$$)"), row); // multiple expressions on the same index - assertInvalidMessage(IndexRestrictions.MULTIPLE_EXPRESSIONS, - "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(custom_index, 'bar')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + IndexRestrictions.MULTIPLE_EXPRESSIONS, + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(custom_index, 'bar')"); // multiple expressions on different indexes createIndex(String.format("CREATE CUSTOM INDEX other_custom_index ON %%s(d) USING '%s'", StubIndex.class.getName())); - assertInvalidMessage(IndexRestrictions.MULTIPLE_EXPRESSIONS, - "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(other_custom_index, 'bar')"); - - assertInvalidMessage(SelectStatement.REQUIRES_ALLOW_FILTERING_MESSAGE, - "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + IndexRestrictions.MULTIPLE_EXPRESSIONS, + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND expr(other_custom_index, 'bar')"); + + assertInvalidThrowMessage(Server.CURRENT_VERSION, + SelectStatement.REQUIRES_ALLOW_FILTERING_MESSAGE, + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0"); assertRows(execute("SELECT * FROM %s WHERE expr(custom_index, 'foo') AND d=0 ALLOW FILTERING"), row); } @@ -349,8 +359,10 @@ public class CustomIndexTest extends CQLTester createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", NoCustomExpressionsIndex.class.getName())); - assertInvalidMessage(String.format( IndexRestrictions.CUSTOM_EXPRESSION_NOT_SUPPORTED, "custom_index"), - "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + String.format( IndexRestrictions.CUSTOM_EXPRESSION_NOT_SUPPORTED, "custom_index"), + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); } @Test @@ -358,8 +370,11 @@ public class CustomIndexTest extends CQLTester { createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", - ExpressionRejectingIndex.class.getName())); - assertInvalidMessage("None shall pass", "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); + AlwaysRejectIndex.class.getName())); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + "None shall pass", + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(custom_index, 'foo bar baz')"); } @Test @@ -367,8 +382,10 @@ public class CustomIndexTest extends CQLTester { createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); createIndex("CREATE INDEX non_custom_index ON %s(c)"); - assertInvalidMessage(String.format(IndexRestrictions.NON_CUSTOM_INDEX_IN_EXPRESSION, "non_custom_index"), - "SELECT * FROM %s WHERE expr(non_custom_index, 'c=0')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + String.format(IndexRestrictions.NON_CUSTOM_INDEX_IN_EXPRESSION, "non_custom_index"), + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(non_custom_index, 'c=0')"); } @Test @@ -377,10 +394,14 @@ public class CustomIndexTest extends CQLTester createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); createIndex(String.format("CREATE CUSTOM INDEX custom_index ON %%s(c) USING '%s'", StubIndex.class.getName())); - assertInvalidMessage(ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED, - "DELETE FROM %s WHERE expr(custom_index, 'foo bar baz ')"); - assertInvalidMessage(ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED, - "UPDATE %s SET d=0 WHERE expr(custom_index, 'foo bar baz ')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED, + QueryValidationException.class, + "DELETE FROM %s WHERE expr(custom_index, 'foo bar baz ')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + ModificationStatement.CUSTOM_EXPRESSIONS_NOT_ALLOWED, + QueryValidationException.class, + "UPDATE %s SET d=0 WHERE expr(custom_index, 'foo bar baz ')"); } @Test @@ -451,12 +472,16 @@ public class CustomIndexTest extends CQLTester UTF8ExpressionIndex.class.getName())); execute("SELECT * FROM %s WHERE expr(text_index, 'foo')"); - assertInvalidMessage("Invalid INTEGER constant (99) for \"custom index expression\" of type text", - "SELECT * FROM %s WHERE expr(text_index, 99)"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + "Invalid INTEGER constant (99) for \"custom index expression\" of type text", + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(text_index, 99)"); execute("SELECT * FROM %s WHERE expr(int_index, 99)"); - assertInvalidMessage("Invalid STRING constant (foo) for \"custom index expression\" of type int", - "SELECT * FROM %s WHERE expr(int_index, 'foo')"); + assertInvalidThrowMessage(Server.CURRENT_VERSION, + "Invalid STRING constant (foo) for \"custom index expression\" of type int", + QueryValidationException.class, + "SELECT * FROM %s WHERE expr(int_index, 'foo')"); } @Test @@ -683,17 +708,22 @@ public class CustomIndexTest extends CQLTester } } - public static final class ExpressionRejectingIndex extends StubIndex + public static final class AlwaysRejectIndex extends StubIndex { - public ExpressionRejectingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) + public AlwaysRejectIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) { super(baseCfs, metadata); } - public Searcher searcherFor(ReadCommand command) throws InvalidRequestException + public void validate(ReadCommand command) throws InvalidRequestException { throw new InvalidRequestException("None shall pass"); } + + public Searcher searcherFor(ReadCommand command) + { + throw new InvalidRequestException("None shall pass (though I'd have expected to fail faster)"); + } } public static final class IndexWithValidateOptions extends StubIndex