Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 32a6f2059 -> 7f1fec190
  refs/heads/cassandra-2.2 b154622b6 -> ca8e9a97b
  refs/heads/cassandra-3.0 a8014bb7e -> 3d986936f
  refs/heads/trunk 9df213941 -> ef1b4c0f5


Add validation method to PerRowSecondaryIndex

Patch by Andrés de la Peña; reviewed by Sam Tunnicliffe for
CASSANDRA-10092


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f1fec19
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f1fec19
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f1fec19

Branch: refs/heads/cassandra-2.1
Commit: 7f1fec19080c423d89ce3af823e2b1532b755035
Parents: 32a6f20
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
Authored: Wed Oct 28 16:41:12 2015 +0000
Committer: Sam Tunnicliffe <s...@beobal.com>
Committed: Thu Oct 29 10:49:19 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 .../cql3/statements/UpdateStatement.java        |   1 +
 .../db/index/PerRowSecondaryIndex.java          |   5 +
 .../db/index/SecondaryIndexManager.java         |   8 +
 .../cassandra/thrift/CassandraServer.java       |  11 +
 .../db/index/PerRowSecondaryIndexTest.java      | 211 ++++++++++++++++++-
 7 files changed, 234 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998dd22..3d22b91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Do STCS in DTCS windows (CASSANDRA-10276)
  * Don't try to get ancestors from half-renamed sstables (CASSANDRA-10501)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index c6ea6c0..712a2c4 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,8 @@ New features
       If moving from the SimpleSnitch, make sure the rack containing all 
current
       nodes is named "rack1". To override this behavior when manually wiping
       the node and bootstrapping, use -Dcassandra.ignore_rack=true.
+    - a new validate(key, cf) method is added to PerRowSecondaryIndex. A 
default
+      implementation is provided, so no changes are required to custom 
implementations.
 
 
 2.1.11

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 06282ad..bf9a059 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -141,6 +141,7 @@ public class UpdateStatement extends ModificationStatement
                                                                     
cfm.cfName));
                 }
             }
+            indexManager.validateRowLevelIndexes(key, cf);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java 
b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index f6f0e8d..5a3d457 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.index;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.db.Cell;
@@ -69,4 +70,8 @@ public abstract class PerRowSecondaryIndex extends 
SecondaryIndex
     {
         return true;
     }
+
+    public void validate(ByteBuffer key, ColumnFamily cf) throws 
InvalidRequestException
+    {
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 12a0a55..179126b 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -683,6 +683,14 @@ public class SecondaryIndexManager
         return null;
     }
 
+    public void validateRowLevelIndexes(ByteBuffer key, ColumnFamily cf) 
throws InvalidRequestException
+    {
+        for (SecondaryIndex index : rowLevelIndexMap.values())
+        {
+            ((PerRowSecondaryIndex) index).validate(key, cf);
+        }
+    }
+
     static boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
     {
         // If any one of name/value/timestamp are different, then we

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index fd87b57..7103721 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -684,6 +684,10 @@ public class CassandraServer implements Cassandra.Iface
 
             ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create(cState.getKeyspace(), 
column_parent.column_family);
             cf.addColumn(name, column.value, column.timestamp, column.ttl);
+
+            // Validate row level indexes. See CASSANDRA-10092 for more 
details.
+            
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validateRowLevelIndexes(key,
 cf);
+
             mutation = new 
org.apache.cassandra.db.Mutation(cState.getKeyspace(), key, cf);
         }
         catch (MarshalException e)
@@ -778,6 +782,9 @@ public class CassandraServer implements Cassandra.Iface
             for (Column column : updates)
                 
cfUpdates.addColumn(cfm.comparator.cellFromByteBuffer(column.name), 
column.value, column.timestamp);
 
+            // Validate row level indexes. See CASSANDRA-10092 for more 
details.
+            
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validateRowLevelIndexes(key,
 cfUpdates);
+
             ColumnFamily cfExpected;
             if (expected.isEmpty())
             {
@@ -874,6 +881,10 @@ public class CassandraServer implements Cassandra.Iface
                         addColumnOrSuperColumn(mutation, metadata, 
m.column_or_supercolumn);
                     }
                 }
+
+                // Validate row level indexes. See CASSANDRA-10092 for more 
details.
+                ColumnFamily cf = mutation.addOrGet(metadata);
+                
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validateRowLevelIndexes(key,
 cf);
             }
             if (standardMutation != null && !standardMutation.isEmpty())
                 mutations.add(standardMutation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f1fec19/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java 
b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 2e885aa..a332342 100644
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.index;
 
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -26,28 +27,48 @@ import java.util.List;
 import java.util.Set;
 
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.ThriftSessionManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.thrift.TException;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-public class PerRowSecondaryIndexTest extends SchemaLoader
+public class PerRowSecondaryIndexTest
 {
 
     // test that when index(key) is called on a PRSI index,
@@ -56,6 +77,29 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
     // indexed & stashes it in a static variable for inspection
     // in the test.
 
+    private static final String KEYSPACE1 = "PerRowSecondaryIndex";
+    private static final String CF_INDEXED = "Indexed1";
+    private static final String INDEXED_COLUMN = "indexed";
+
+    private static CassandraServer server;
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.startGossiper();
+        new EmbeddedCassandraService().start();
+        ThriftSessionManager.instance.setCurrentSocket(new 
InetSocketAddress(9160));
+        for (KSMetaData ksm : SchemaLoader.schemaDefinition(null))
+                MigrationManager.announceNewKeyspace(ksm);
+//        SchemaLoader.createKeyspace(KEYSPACE1,
+//                                    SimpleStrategy.class,
+//                                    KSMetaData.optsWithRF(1),
+//                                    
SchemaLoader.perRowIndexedCFMD(KEYSPACE1, CF_INDEXED));
+        server = new CassandraServer();
+        server.set_keyspace(KEYSPACE1);
+    }
+
     @Before
     public void clearTestStub()
     {
@@ -145,6 +189,147 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
         }
     }
 
+    @Test
+    public void testInvalidCqlInsert()
+    {
+        // test we can insert if the index validates the expression:
+        QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".\"Indexed1\" (key, indexed) VALUES ('valid','valid')", KEYSPACE1));
+
+        // test we can't insert if the index doesn't validate the key:
+        try
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".\"Indexed1\" (key, indexed) VALUES ('invalid','valid')", KEYSPACE1));
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e.getCause() instanceof InvalidRequestException);
+        }
+
+        // test we can't insert if the index doesn't validate the columns:
+        try
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".\"Indexed1\" (key, indexed) VALUES ('valid','invalid')", KEYSPACE1));
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e.getCause() instanceof InvalidRequestException);
+        }
+    }
+
+    @Test
+    public void testInvalidThriftInsert() throws IOException, TException
+    {
+
+        long timestamp = System.currentTimeMillis();
+        ColumnPath cp = new ColumnPath(CF_INDEXED);
+        ColumnParent par = new ColumnParent(CF_INDEXED);
+        cp.column = ByteBufferUtil.bytes(INDEXED_COLUMN);
+
+        // test we can insert if the index validates the expression:
+        ByteBuffer key = ByteBufferUtil.bytes("valid");
+        server.insert(key, par, new 
Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(timestamp), 
ConsistencyLevel.ONE);
+
+        // test we can't insert if the index doesn't validate the key:
+        try
+        {
+            key = ByteBufferUtil.bytes("invalid");
+            server.insert(key, par, new 
Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(timestamp), 
ConsistencyLevel.ONE);
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e instanceof 
org.apache.cassandra.thrift.InvalidRequestException);
+        }
+
+        // test we can't insert if the index doesn't validate the columns:
+        try
+        {
+            key = ByteBufferUtil.bytes("valid");
+            server.insert(key, par, new 
Column(key).setValue(ByteBufferUtil.bytes("invalid")).setTimestamp(timestamp), 
ConsistencyLevel.ONE);
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e instanceof 
org.apache.cassandra.thrift.InvalidRequestException);
+        }
+    }
+
+    @Test
+    public void testInvalidThriftCas() throws IOException, TException
+    {
+        // test we can insert if the index validates the expression:
+        ByteBuffer key = ByteBufferUtil.bytes("valid");
+        Column column = new 
Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(System.currentTimeMillis());
+        server.cas(key, CF_INDEXED, Collections.<Column>emptyList(), 
Collections.singletonList(column), ConsistencyLevel.LOCAL_SERIAL, 
ConsistencyLevel.ONE);
+
+        // test we can't insert if the index doesn't validate the key:
+        try
+        {
+            key = ByteBufferUtil.bytes("invalid");
+            server.cas(key, CF_INDEXED, Collections.<Column>emptyList(), 
Collections.singletonList(column), ConsistencyLevel.LOCAL_SERIAL, 
ConsistencyLevel.ONE);
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e instanceof 
org.apache.cassandra.thrift.InvalidRequestException);
+        }
+
+        // test we can't insert if the index doesn't validate the columns:
+        try
+        {
+            key = ByteBufferUtil.bytes("valid");
+            column.setValue(ByteBufferUtil.bytes("invalid"));
+            server.cas(key, CF_INDEXED, Collections.<Column>emptyList(), 
Collections.singletonList(column), ConsistencyLevel.LOCAL_SERIAL, 
ConsistencyLevel.ONE);
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e instanceof 
org.apache.cassandra.thrift.InvalidRequestException);
+        }
+    }
+
+    @Test
+    public void testInvalidThriftBatchMutate() throws IOException, TException
+    {
+        ByteBuffer key = ByteBufferUtil.bytes("valid");
+        long timestamp = System.currentTimeMillis();
+
+        org.apache.cassandra.thrift.Mutation mutation = new 
org.apache.cassandra.thrift.Mutation();
+        Column column = new 
Column(key).setValue(ByteBufferUtil.bytes("valid")).setTimestamp(System.currentTimeMillis());
+        ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+        cosc.setColumn(column);
+        mutation.setColumn_or_supercolumn(cosc);
+
+        server.batch_mutate(Collections.singletonMap(key, 
Collections.singletonMap(CF_INDEXED, Collections.singletonList(mutation))), 
ConsistencyLevel.ONE);
+
+        // test we can't insert if the index doesn't validate the key:
+        try
+        {
+            key = ByteBufferUtil.bytes("invalid");
+            server.batch_mutate(Collections.singletonMap(key, 
Collections.singletonMap(CF_INDEXED, Collections.singletonList(mutation))), 
ConsistencyLevel.ONE);
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e instanceof 
org.apache.cassandra.thrift.InvalidRequestException);
+        }
+
+        // test we can't insert if the index doesn't validate the columns:
+        try
+        {
+            key = ByteBufferUtil.bytes("valid");
+            cosc.setColumn(new 
Column(key).setValue(ByteBufferUtil.bytes("invalid")).setTimestamp(timestamp));
+            server.batch_mutate(Collections.singletonMap(key, 
Collections.singletonMap(CF_INDEXED, Collections.singletonList(mutation))), 
ConsistencyLevel.ONE);
+            fail("Query should have been invalid!");
+        }
+        catch (Exception e)
+        {
+            assertTrue(e instanceof 
org.apache.cassandra.thrift.InvalidRequestException);
+        }
+    }
+
     public static class TestIndex extends PerRowSecondaryIndex
     {
         public static volatile boolean ACTIVE = true;
@@ -258,5 +443,21 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
         public long estimateResultRows() {
             return 0;
         }
+
+        @Override
+        public void validate(ByteBuffer key, ColumnFamily cf) throws 
InvalidRequestException
+        {
+            if (key.equals(ByteBufferUtil.bytes("invalid")))
+            {
+                throw new InvalidRequestException("Invalid key!");
+            }
+            for (Cell cell : cf)
+            {
+                if (cell.value().equals(ByteBufferUtil.bytes("invalid")))
+                {
+                    throw new InvalidRequestException("Invalid column!");
+                }
+            }
+        }
     }
 }

Reply via email to