Ensure indexed values are only validated once per partition Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-10536
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aad3ae2c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aad3ae2c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aad3ae2c Branch: refs/heads/cassandra-3.0 Commit: aad3ae2cbec85ca36d3caacbe68aebe1e552f41b Parents: 89293ef Author: Sam Tunnicliffe <s...@beobal.com> Authored: Fri Oct 16 13:53:52 2015 +0100 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Fri Oct 16 17:59:51 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/UpdateParameters.java | 27 +------- .../cql3/statements/BatchStatement.java | 10 ++- .../cql3/statements/CQL3CasRequest.java | 14 ++-- .../cql3/statements/DeleteStatement.java | 2 - .../cql3/statements/ModificationStatement.java | 4 +- .../cql3/statements/UpdateStatement.java | 2 - .../cql3/statements/UpdatesCollector.java | 20 ++++-- .../cassandra/io/sstable/CQLSSTableWriter.java | 8 +-- .../validation/entities/SecondaryIndexTest.java | 72 ++++++++++++++++++++ .../index/internal/CassandraIndexTest.java | 55 +++++++++++++++ 11 files changed, 164 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e2d9dd7..77facc4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0-rc2 + * Ensure validation of indexed values only occurs once per-partition (CASSANDRA-10536) * Fix handling of static columns for range tombstones in thrift (CASSANDRA-10174) * Support empty ColumnFilter for backward compatility on empty IN (CASSANDRA-10471) * Remove Pig support (CASSANDRA-10542) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 03468f0..572365b 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -26,10 +26,8 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.Partition; -import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.utils.FBUtilities; /** @@ -47,8 +45,6 @@ public class UpdateParameters private final DeletionTime deletionTime; - private final SecondaryIndexManager indexManager; - // For lists operation that require a read-before-write. Will be null otherwise. private final Map<DecoratedKey, Partition> prefetchedRows; @@ -63,8 +59,7 @@ public class UpdateParameters QueryOptions options, long timestamp, int ttl, - Map<DecoratedKey, Partition> prefetchedRows, - boolean validateIndexedColumns) + Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException { this.metadata = metadata; @@ -79,32 +74,12 @@ public class UpdateParameters this.prefetchedRows = prefetchedRows; - // Index column validation triggers a call to Keyspace.open() which we want - // to be able to avoid in some case (e.g. when using CQLSSTableWriter) - if (validateIndexedColumns) - { - SecondaryIndexManager manager = Keyspace.openAndGetStore(metadata).indexManager; - indexManager = manager.hasIndexes() ? manager : null; - } - else - { - indexManager = null; - } - // We use MIN_VALUE internally to mean the absence of of timestamp (in Selection, in sstable stats, ...), so exclude // it to avoid potential confusion. if (timestamp == Long.MIN_VALUE) throw new InvalidRequestException(String.format("Out of bound timestamp, must be in [%d, %d]", Long.MIN_VALUE + 1, Long.MAX_VALUE)); } - public void validateIndexedColumns(PartitionUpdate update) - { - if (indexManager == null) - return; - - indexManager.validate(update); - } - public void newRow(Clustering clustering) throws InvalidRequestException { if (metadata.isDense() && !metadata.isCompound()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 0b3e1ba..d63bbc8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -23,21 +23,18 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.collect.Iterables; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; + import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.RowIterator; -import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.ClientWarn; -import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.*; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.NoSpamLogger; @@ -242,6 +239,7 @@ public class BatchStatement implements CQLStatement .getMessage()); } + collector.validateIndexedColumns(); return collector.toMutations(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 1c3c795..41aef83 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -22,11 +22,15 @@ import java.util.*; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import org.apache.cassandra.cql3.*; + import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.CASRequest; import org.apache.cassandra.utils.Pair; @@ -170,6 +174,8 @@ public class CQL3CasRequest implements CASRequest for (RowUpdate upd : updates) upd.applyUpdates(current, update); + Keyspace.openAndGetStore(cfm).indexManager.validate(update); + if (isBatch) BatchStatement.verifyBatchSize(Collections.singleton(update)); @@ -200,7 +206,7 @@ public class CQL3CasRequest implements CASRequest public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException { Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null; - UpdateParameters params = new UpdateParameters(cfm, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map, true); + UpdateParameters params = new UpdateParameters(cfm, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map); stmt.addUpdateForKey(updates, clustering, params); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index da188a9..d51f261 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -93,8 +93,6 @@ public class DeleteStatement extends ModificationStatement update.add(params.buildRow()); } } - - params.validateIndexedColumns(update); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 23a26d0..0e989e6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -604,6 +604,8 @@ public abstract class ModificationStatement implements CQLStatement { UpdatesCollector collector = new UpdatesCollector(updatedColumns, 1); addUpdates(collector, options, local, now); + collector.validateIndexedColumns(); + return collector.toMutations(); } @@ -706,7 +708,7 @@ public abstract class ModificationStatement implements CQLStatement { // Some lists operation requires reading Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency()); - return new UpdateParameters(cfm, updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists, true); + return new UpdateParameters(cfm, updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists); } private Slices toSlices(SortedSet<Slice.Bound> startBounds, SortedSet<Slice.Bound> endBounds) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index ce9aaee..d6d0266 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -103,8 +103,6 @@ public class UpdateStatement extends ModificationStatement op.execute(update.partitionKey(), params); update.add(params.buildRow()); } - - params.validateIndexedColumns(update); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java index f291000..8fc5ef5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java @@ -20,13 +20,9 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.db.CounterMutation; - -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; /** * Utility class to collect updates. @@ -80,6 +76,18 @@ final class UpdatesCollector return upd; } + /** + * Check all partition updates contain only valid values for any + * indexed columns. + */ + public void validateIndexedColumns() + { + for (Map<ByteBuffer, IMutation> perKsMutations : mutations.values()) + for (IMutation mutation : perKsMutations.values()) + for (PartitionUpdate update : mutation.getPartitionUpdates()) + Keyspace.openAndGetStore(update.metadata()).indexManager.validate(update); + } + private Mutation getMutation(CFMetaData cfm, DecoratedKey dk, ConsistencyLevel consistency) { String ksName = cfm.ksName; @@ -127,4 +135,4 @@ final class UpdatesCollector } return ksMap; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 4a2af66..70380f4 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -28,9 +28,10 @@ import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -216,8 +217,7 @@ public class CQLSSTableWriter implements Closeable options, insert.getTimestamp(now, options), insert.getTimeToLive(options), - Collections.<DecoratedKey, Partition>emptyMap(), - false); + Collections.<DecoratedKey, Partition>emptyMap()); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java index 472149d..9cba01a 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.statements.IndexTarget; @@ -555,12 +556,23 @@ public class SecondaryIndexTest extends CQLTester // CASSANDRA-8280/8081 // reject updates with indexed values where value > 64k + // make sure we check conditional and unconditional statements, + // both singly and in batches (CASSANDRA-10536) @Test public void testIndexOnCompositeValueOver64k() throws Throwable { createTable("CREATE TABLE %s(a int, b int, c blob, PRIMARY KEY (a))"); createIndex("CREATE INDEX ON %s(c)"); failInsert("INSERT INTO %s (a, b, c) VALUES (0, 0, ?)", ByteBuffer.allocate(TOO_BIG)); + failInsert("INSERT INTO %s (a, b, c) VALUES (0, 0, ?) IF NOT EXISTS", ByteBuffer.allocate(TOO_BIG)); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b, c) VALUES (0, 0, ?);\n" + + "APPLY BATCH", + ByteBuffer.allocate(TOO_BIG)); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b, c) VALUES (0, 0, ?) IF NOT EXISTS;\n" + + "APPLY BATCH", + ByteBuffer.allocate(TOO_BIG)); } @Test @@ -569,6 +581,15 @@ public class SecondaryIndexTest extends CQLTester createTable("CREATE TABLE %s(a int, b blob, PRIMARY KEY (a)) WITH COMPACT STORAGE"); createIndex("CREATE INDEX ON %s(b)"); failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", ByteBuffer.allocate(TOO_BIG)); + failInsert("INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS", ByteBuffer.allocate(TOO_BIG)); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b) VALUES (0, ?);\n" + + "APPLY BATCH", + ByteBuffer.allocate(TOO_BIG)); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS;\n" + + "APPLY BATCH", + ByteBuffer.allocate(TOO_BIG)); } @Test @@ -576,7 +597,29 @@ public class SecondaryIndexTest extends CQLTester { createTable("CREATE TABLE %s(a int, b int, c blob, PRIMARY KEY ((a, b)))"); createIndex("CREATE INDEX ON %s(a)"); + succeedInsert("INSERT INTO %s (a, b, c) VALUES (0, 0, ?) IF NOT EXISTS", ByteBuffer.allocate(TOO_BIG)); succeedInsert("INSERT INTO %s (a, b, c) VALUES (0, 0, ?)", ByteBuffer.allocate(TOO_BIG)); + succeedInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b, c) VALUES (0, 0, ?);\n" + + "APPLY BATCH", ByteBuffer.allocate(TOO_BIG)); + + // the indexed value passes validation, but the batch size will + // exceed the default failure threshold, so temporarily raise it + // (the non-conditional batch doesn't hit this because + // BatchStatement::executeInternal skips the size check but CAS + // path does not) + long batchSizeThreshold = DatabaseDescriptor.getBatchSizeFailThreshold(); + try + { + DatabaseDescriptor.setBatchSizeFailThresholdInKB( (TOO_BIG / 1024) * 2); + succeedInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b, c) VALUES (1, 1, ?) IF NOT EXISTS;\n" + + "APPLY BATCH", ByteBuffer.allocate(TOO_BIG)); + } + finally + { + DatabaseDescriptor.setBatchSizeFailThresholdInKB((int) (batchSizeThreshold / 1024)); + } } @Test @@ -584,7 +627,29 @@ public class SecondaryIndexTest extends CQLTester { createTable("CREATE TABLE %s(a int, b int, c blob, PRIMARY KEY (a, b))"); createIndex("CREATE INDEX ON %s(b)"); + succeedInsert("INSERT INTO %s (a, b, c) VALUES (0, 0, ?) IF NOT EXISTS", ByteBuffer.allocate(TOO_BIG)); succeedInsert("INSERT INTO %s (a, b, c) VALUES (0, 0, ?)", ByteBuffer.allocate(TOO_BIG)); + succeedInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b, c) VALUES (0, 0, ?);\n" + + "APPLY BATCH", ByteBuffer.allocate(TOO_BIG)); + + // the indexed value passes validation, but the batch size will + // exceed the default failure threshold, so temporarily raise it + // (the non-conditional batch doesn't hit this because + // BatchStatement::executeInternal skips the size check but CAS + // path does not) + long batchSizeThreshold = DatabaseDescriptor.getBatchSizeFailThreshold(); + try + { + DatabaseDescriptor.setBatchSizeFailThresholdInKB( (TOO_BIG / 1024) * 2); + succeedInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b, c) VALUES (1, 1, ?) IF NOT EXISTS;\n" + + "APPLY BATCH", ByteBuffer.allocate(TOO_BIG)); + } + finally + { + DatabaseDescriptor.setBatchSizeFailThresholdInKB((int)(batchSizeThreshold / 1024)); + } } @Test @@ -595,6 +660,13 @@ public class SecondaryIndexTest extends CQLTester Map<Integer, ByteBuffer> map = new HashMap(); map.put(0, ByteBuffer.allocate(1024 * 65)); failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", map); + failInsert("INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS", map); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b) VALUES (0, ?);\n" + + "APPLY BATCH", map); + failInsert("BEGIN BATCH\n" + + "INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS;\n" + + "APPLY BATCH", map); } public void failInsert(String insertCQL, Object...args) throws Throwable http://git-wip-us.apache.org/repos/asf/cassandra/blob/aad3ae2c/test/unit/org/apache/cassandra/index/internal/CassandraIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/internal/CassandraIndexTest.java b/test/unit/org/apache/cassandra/index/internal/CassandraIndexTest.java index 73ce6c0..34cbe16 100644 --- a/test/unit/org/apache/cassandra/index/internal/CassandraIndexTest.java +++ b/test/unit/org/apache/cassandra/index/internal/CassandraIndexTest.java @@ -413,6 +413,61 @@ public class CassandraIndexTest extends CQLTester assertIndexRowTtl(indexCfs, indexedVal, updatedTtl); } + @Test + public void indexBatchStatements() throws Throwable + { + // see CASSANDRA-10536 + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); + createIndex("CREATE INDEX ON %s(c)"); + + // Multi partition batch + execute("BEGIN BATCH\n" + + "UPDATE %1$s SET c = 0 WHERE a = 0 AND b = 0;\n" + + "UPDATE %1$s SET c = 1 WHERE a = 1 AND b = 1;\n" + + "APPLY BATCH"); + assertRows(execute("SELECT * FROM %s WHERE c = 0"), row(0, 0, 0)); + assertRows(execute("SELECT * FROM %s WHERE c = 1"), row(1, 1, 1)); + + // Single Partition batch + execute("BEGIN BATCH\n" + + "UPDATE %1$s SET c = 2 WHERE a = 2 AND b = 0;\n" + + "UPDATE %1$s SET c = 3 WHERE a = 2 AND b = 1;\n" + + "APPLY BATCH"); + assertRows(execute("SELECT * FROM %s WHERE c = 2"), row(2, 0, 2)); + assertRows(execute("SELECT * FROM %s WHERE c = 3"), row(2, 1, 3)); + } + + @Test + public void indexStatementsWithConditions() throws Throwable + { + // see CASSANDRA-10536 + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); + createIndex("CREATE INDEX ON %s(c)"); + + execute("INSERT INTO %s (a, b, c) VALUES (0, 0, 0) IF NOT EXISTS"); + assertRows(execute("SELECT * FROM %s WHERE c = 0"), row(0, 0, 0)); + + execute("INSERT INTO %s (a, b, c) VALUES (0, 0, 1) IF NOT EXISTS"); + assertEmpty(execute("SELECT * FROM %s WHERE c = 1")); + + execute("UPDATE %s SET c = 1 WHERE a = 0 AND b =0 IF c = 0"); + assertRows(execute("SELECT * FROM %s WHERE c = 1"), row(0, 0, 1)); + assertEmpty(execute("SELECT * FROM %s WHERE c = 0")); + + execute("DELETE FROM %s WHERE a = 0 AND b = 0 IF c = 0"); + assertRows(execute("SELECT * FROM %s WHERE c = 1"), row(0, 0, 1)); + + execute("DELETE FROM %s WHERE a = 0 AND b = 0 IF c = 1"); + assertEmpty(execute("SELECT * FROM %s WHERE c = 1")); + + execute("BEGIN BATCH\n" + + "INSERT INTO %1$s (a, b, c) VALUES (2, 2, 2) IF NOT EXISTS;\n" + + "INSERT INTO %1$s (a, b, c) VALUES (2, 3, 3)\n" + + "APPLY BATCH"); + assertRows(execute("SELECT * FROM %s WHERE c = 2"), row(2, 2, 2)); + assertRows(execute("SELECT * FROM %s WHERE c = 3"), row(2, 3, 3)); + } + // this is slightly annoying, but we cannot read rows from the methods in Util as // ReadCommand#executeInternal uses metadata retrieved via the cfId, which the index // CFS inherits from the base CFS. This has the 'wrong' partitioner (the index table