Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 32a6f2059 -> 7f1fec190 refs/heads/cassandra-2.2 b154622b6 -> ca8e9a97b refs/heads/cassandra-3.0 a8014bb7e -> 3d986936f refs/heads/trunk 9df213941 -> ef1b4c0f5
Add validation method to PerRowSecondaryIndex Patch by Andrés de la Peña; reviewed by Sam Tunnicliffe for CASSANDRA-10092 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f1fec19 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f1fec19 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f1fec19 Branch: refs/heads/cassandra-2.1 Commit: 7f1fec19080c423d89ce3af823e2b1532b755035 Parents: 32a6f20 Author: AndreÌs de la PenÌa <a.penya.gar...@gmail.com> Authored: Wed Oct 28 16:41:12 2015 +0000 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Thu Oct 29 10:49:19 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + .../cql3/statements/UpdateStatement.java | 1 + .../db/index/PerRowSecondaryIndex.java | 5 + .../db/index/SecondaryIndexManager.java | 8 + .../cassandra/thrift/CassandraServer.java | 11 + .../db/index/PerRowSecondaryIndexTest.java | 211 ++++++++++++++++++- 7 files changed, 234 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 998dd22..3d22b91 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092) * Support encrypted and plain traffic on the same port (CASSANDRA-10559) * Do STCS in DTCS windows (CASSANDRA-10276) * Don't try to get ancestors from half-renamed sstables (CASSANDRA-10501) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index c6ea6c0..712a2c4 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -25,6 +25,8 @@ New features If moving from the SimpleSnitch, make sure the rack containing all current nodes is named "rack1". To override this behavior when manually wiping the node and bootstrapping, use -Dcassandra.ignore_rack=true. + - a new validate(key, cf) method is added to PerRowSecondaryIndex. A default + implementation is provided, so no changes are required to custom implementations. 2.1.11 http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 06282ad..bf9a059 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -141,6 +141,7 @@ public class UpdateStatement extends ModificationStatement cfm.cfName)); } } + indexManager.validateRowLevelIndexes(key, cf); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java index f6f0e8d..5a3d457 100644 --- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.index; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.db.Cell; @@ -69,4 +70,8 @@ public abstract class PerRowSecondaryIndex extends SecondaryIndex { return true; } + + public void validate(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException + { + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 12a0a55..179126b 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -683,6 +683,14 @@ public class SecondaryIndexManager return null; } + public void validateRowLevelIndexes(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException + { + for (SecondaryIndex index : rowLevelIndexMap.values()) + { + ((PerRowSecondaryIndex) index).validate(key, cf); + } + } + static boolean shouldCleanupOldValue(Cell oldCell, Cell newCell) { // If any one of name/value/timestamp are different, then we http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index fd87b57..7103721 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -684,6 +684,10 @@ public class CassandraServer implements Cassandra.Iface ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cState.getKeyspace(), column_parent.column_family); cf.addColumn(name, column.value, column.timestamp, column.ttl); + + // Validate row level indexes. See CASSANDRA-10092 for more details. + Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validateRowLevelIndexes(key, cf); + mutation = new org.apache.cassandra.db.Mutation(cState.getKeyspace(), key, cf); } catch (MarshalException e) @@ -778,6 +782,9 @@ public class CassandraServer implements Cassandra.Iface for (Column column : updates) cfUpdates.addColumn(cfm.comparator.cellFromByteBuffer(column.name), column.value, column.timestamp); + // Validate row level indexes. See CASSANDRA-10092 for more details. + Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validateRowLevelIndexes(key, cfUpdates); + ColumnFamily cfExpected; if (expected.isEmpty()) { @@ -874,6 +881,10 @@ public class CassandraServer implements Cassandra.Iface addColumnOrSuperColumn(mutation, metadata, m.column_or_supercolumn); } } + + // Validate row level indexes. See CASSANDRA-10092 for more details. + ColumnFamily cf = mutation.addOrGet(metadata); + Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validateRowLevelIndexes(key, cf); } if (standardMutation != null && !standardMutation.isEmpty()) mutations.add(standardMutation); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java index 2e885aa..a332342 100644 --- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.index; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -26,28 +27,48 @@ import java.util.List; import java.util.Set; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IndexExpression; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.Row; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.ExtendedFilter; import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.thrift.CassandraServer; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.ThriftSessionManager; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.thrift.TException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -public class PerRowSecondaryIndexTest extends SchemaLoader +public class PerRowSecondaryIndexTest { // test that when index(key) is called on a PRSI index, @@ -56,6 +77,29 @@ public class PerRowSecondaryIndexTest extends SchemaLoader // indexed & stashes it in a static variable for inspection // in the test. + private static final String KEYSPACE1 = "PerRowSecondaryIndex"; + private static final String CF_INDEXED = "Indexed1"; + private static final String INDEXED_COLUMN = "indexed"; + + private static CassandraServer server; + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + SchemaLoader.startGossiper(); + new EmbeddedCassandraService().start(); + ThriftSessionManager.instance.setCurrentSocket(new InetSocketAddress(9160)); + for (KSMetaData ksm : SchemaLoader.schemaDefinition(null)) + MigrationManager.announceNewKeyspace(ksm); +// SchemaLoader.createKeyspace(KEYSPACE1, +// SimpleStrategy.class, +// KSMetaData.optsWithRF(1), +// SchemaLoader.perRowIndexedCFMD(KEYSPACE1, CF_INDEXED)); + server = new CassandraServer(); + server.set_keyspace(KEYSPACE1); + } + @Before public void clearTestStub() { @@ -145,6 +189,147 @@ public class PerRowSecondaryIndexTest extends SchemaLoader } } + @Test + public void testInvalidCqlInsert() + { + // test we can insert if the index validates the expression: + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".\"Indexed1\" (key, indexed) VALUES ('valid','valid')", KEYSPACE1)); + + // test we can't insert if the index doesn't validate the key: + try + { + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".\"Indexed1\" (key, indexed) VALUES ('invalid','valid')", KEYSPACE1)); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e.getCause() instanceof InvalidRequestException); + } + + // test we can't insert if the index doesn't validate the columns: + try + { + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".\"Indexed1\" (key, indexed) VALUES ('valid','invalid')", KEYSPACE1)); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e.getCause() instanceof InvalidRequestException); + } + } + + @Test + public void testInvalidThriftInsert() throws IOException, TException + { + + long timestamp = System.currentTimeMillis(); + ColumnPath cp = new ColumnPath(CF_INDEXED); + ColumnParent par = new ColumnParent(CF_INDEXED); + cp.column = ByteBufferUtil.bytes(INDEXED_COLUMN); + + // test we can insert if the index validates the expression: + ByteBuffer key = ByteBufferUtil.bytes("valid"); + server.insert(key, par, new Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(timestamp), ConsistencyLevel.ONE); + + // test we can't insert if the index doesn't validate the key: + try + { + key = ByteBufferUtil.bytes("invalid"); + server.insert(key, par, new Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(timestamp), ConsistencyLevel.ONE); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e instanceof org.apache.cassandra.thrift.InvalidRequestException); + } + + // test we can't insert if the index doesn't validate the columns: + try + { + key = ByteBufferUtil.bytes("valid"); + server.insert(key, par, new Column(key).setValue(ByteBufferUtil.bytes("invalid")).setTimestamp(timestamp), ConsistencyLevel.ONE); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e instanceof org.apache.cassandra.thrift.InvalidRequestException); + } + } + + @Test + public void testInvalidThriftCas() throws IOException, TException + { + // test we can insert if the index validates the expression: + ByteBuffer key = ByteBufferUtil.bytes("valid"); + Column column = new Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(System.currentTimeMillis()); + server.cas(key, CF_INDEXED, Collections.<Column>emptyList(), Collections.singletonList(column), ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.ONE); + + // test we can't insert if the index doesn't validate the key: + try + { + key = ByteBufferUtil.bytes("invalid"); + server.cas(key, CF_INDEXED, Collections.<Column>emptyList(), Collections.singletonList(column), ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.ONE); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e instanceof org.apache.cassandra.thrift.InvalidRequestException); + } + + // test we can't insert if the index doesn't validate the columns: + try + { + key = ByteBufferUtil.bytes("valid"); + column.setValue(ByteBufferUtil.bytes("invalid")); + server.cas(key, CF_INDEXED, Collections.<Column>emptyList(), Collections.singletonList(column), ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.ONE); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e instanceof org.apache.cassandra.thrift.InvalidRequestException); + } + } + + @Test + public void testInvalidThriftBatchMutate() throws IOException, TException + { + ByteBuffer key = ByteBufferUtil.bytes("valid"); + long timestamp = System.currentTimeMillis(); + + org.apache.cassandra.thrift.Mutation mutation = new org.apache.cassandra.thrift.Mutation(); + Column column = new Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(System.currentTimeMillis()); + ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); + cosc.setColumn(column); + mutation.setColumn_or_supercolumn(cosc); + + server.batch_mutate(Collections.singletonMap(key, Collections.singletonMap(CF_INDEXED, Collections.singletonList(mutation))), ConsistencyLevel.ONE); + + // test we can't insert if the index doesn't validate the key: + try + { + key = ByteBufferUtil.bytes("invalid"); + server.batch_mutate(Collections.singletonMap(key, Collections.singletonMap(CF_INDEXED, Collections.singletonList(mutation))), ConsistencyLevel.ONE); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e instanceof org.apache.cassandra.thrift.InvalidRequestException); + } + + // test we can't insert if the index doesn't validate the columns: + try + { + key = ByteBufferUtil.bytes("valid"); + cosc.setColumn(new Column(key).setValue(ByteBufferUtil.bytes("invalid")).setTimestamp(timestamp)); + server.batch_mutate(Collections.singletonMap(key, Collections.singletonMap(CF_INDEXED, Collections.singletonList(mutation))), ConsistencyLevel.ONE); + fail("Query should have been invalid!"); + } + catch (Exception e) + { + assertTrue(e instanceof org.apache.cassandra.thrift.InvalidRequestException); + } + } + public static class TestIndex extends PerRowSecondaryIndex { public static volatile boolean ACTIVE = true; @@ -258,5 +443,21 @@ public class PerRowSecondaryIndexTest extends SchemaLoader public long estimateResultRows() { return 0; } + + @Override + public void validate(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException + { + if (key.equals(ByteBufferUtil.bytes("invalid"))) + { + throw new InvalidRequestException("Invalid key!"); + } + for (Cell cell : cf) + { + if (cell.value().equals(ByteBufferUtil.bytes("invalid"))) + { + throw new InvalidRequestException("Invalid column!"); + } + } + } } }