This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new c76ff1b Fix secondary indexes on primary key columns skipping some writes c76ff1b is described below commit c76ff1ba14487d521c49d4b830b2d718d170b2e1 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Tue Aug 24 17:57:53 2021 +0100 Fix secondary indexes on primary key columns skipping some writes patch by Andrés de la Peña; reviewed by Benjamin Lerer for CASSANDRA-16868 --- CHANGES.txt | 1 + .../cassandra/index/internal/CassandraIndex.java | 5 +- .../validation/entities/SecondaryIndexTest.java | 140 ++++++++++++++++++++- 3 files changed, 139 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4a8288f..9089836 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.26: + * Fix secondary indexes on primary key columns skipping some writes (CASSANDRA-16868) * Fix incorrect error message in LegacyLayout (CASSANDRA-15136) * Use JMX to validate nodetool --jobs parameter (CASSANDRA-16104) * Handle properly UnsatisfiedLinkError in NativeLibrary#getProcessID() (CASSANDRA-16578) diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index ad5dd4b..de3a974 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -436,7 +436,7 @@ public abstract class CassandraIndex implements Index if (isPrimaryKeyIndex()) indexPrimaryKey(newRow.clustering(), - newRow.primaryKeyLivenessInfo(), + getPrimaryKeyIndexLiveness(newRow), newRow.deletion()); if (indexedColumn.isComplex()) @@ -514,10 +514,7 @@ public abstract class CassandraIndex implements Index if (cell.isLive(nowInSec)) { if (cellTimestamp > timestamp) - { timestamp = cellTimestamp; - ttl = cell.ttl(); - } } } return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec); diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java index c2640a0..201571e 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java @@ -1235,39 +1235,144 @@ public class SecondaryIndexTest extends CQLTester @Test public void testIndexOnPartitionKeyInsertExpiringColumn() throws Throwable { + testIndexOnPartitionKeyInsertExpiringColumn(false); + } + + @Test + public void testIndexOnPartitionKeyInsertExpiringColumnWithFlush() throws Throwable + { + testIndexOnPartitionKeyInsertExpiringColumn(true); + } + + private void testIndexOnPartitionKeyInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable + { createTable("CREATE TABLE %s (k1 int, k2 int, a int, b int, PRIMARY KEY ((k1, k2)))"); - createIndex("CREATE INDEX on %s(k1)"); + createIndex("CREATE INDEX ON %s(k1)"); execute("INSERT INTO %s (k1, k2, a, b) VALUES (1, 2, 3, 4)"); assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, 4)); + + if (flushBeforeUpdate) + flush(); + execute("UPDATE %s USING TTL 1 SET b = 10 WHERE k1 = 1 AND k2 = 2"); Thread.sleep(1000); assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, null)); } @Test + public void testIndexOnPartitionKeyOverridingExpiredRow() throws Throwable + { + createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2)))"); + createIndex("CREATE INDEX ON %s(k1)"); + + execute("UPDATE %s USING TTL 1 SET v = 3 WHERE k1 = 1 AND k2 = 2"); + Thread.sleep(1000); + + assertEmpty(execute("SELECT * FROM %s WHERE k1 = 1")); + + execute("UPDATE %s SET v = 3 WHERE k1 = 1 AND k2 = 2"); + assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3)); + } + + @Test + public void testIndexOnPartitionKeyOverridingDeletedRow() throws Throwable + { + createTable("CREATE TABLE %s (k1 int, k2 int, c int, v int, PRIMARY KEY ((k1, k2), c))"); + createIndex("CREATE INDEX ON %s(k1)"); + + execute("INSERT INTO %s(k1, k2, c, v) VALUES (1, 2, 3, 4)"); + execute("DELETE FROM %s WHERE k1 = 1 AND k2 = 2 AND c = 3"); + execute("UPDATE %s SET v = 4 WHERE k1 = 1 AND k2 = 2 AND c = 3"); + + assertRows(execute("SELECT * FROM %s WHERE k1 = 1 AND k2 = 2 AND c = 3"), row(1, 2, 3, 4)); + assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, 4)); + } + + @Test public void testIndexOnClusteringKeyInsertExpiringColumn() throws Throwable { + testIndexOnClusteringKeyInsertExpiringColumn(false); + } + + @Test + public void testIndexOnClusteringKeyInsertExpiringColumnWithFlush() throws Throwable + { + testIndexOnClusteringKeyInsertExpiringColumn(true); + } + + private void testIndexOnClusteringKeyInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable + { createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); - createIndex("CREATE INDEX on %s(ck)"); + createIndex("CREATE INDEX ON %s(ck)"); execute("INSERT INTO %s (pk, ck, a, b) VALUES (1, 2, 3, 4)"); assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3, 4)); + + if (flushBeforeUpdate) + flush(); + execute("UPDATE %s USING TTL 1 SET b = 10 WHERE pk = 1 AND ck = 2"); Thread.sleep(1000); assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3, null)); } @Test + public void testIndexOnClusteringKeyOverridingExpiredRow() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(ck)"); + + execute("UPDATE %s USING TTL 1 SET v = 3 WHERE pk = 1 AND ck = 2"); + Thread.sleep(1000); + + assertEmpty(execute("SELECT * FROM %s WHERE ck = 2")); + + execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2"); + assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3)); + } + + @Test + public void testIndexOnClusteringKeyOverridingDeletedRow() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(ck)"); + + execute("INSERT INTO %s(pk, ck, v) VALUES (1, 2, 3)"); + execute("DELETE FROM %s WHERE pk = 1 AND ck = 2"); + execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2"); + + assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 2"), row(1, 2, 3)); + assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3)); + } + + @Test public void testIndexOnRegularColumnInsertExpiringColumn() throws Throwable { + testIndexOnRegularColumnInsertExpiringColumn(false); + } + + @Test + public void testIndexOnRegularColumnInsertExpiringColumnWithFlush() throws Throwable + { + testIndexOnRegularColumnInsertExpiringColumn(true); + } + + private void testIndexOnRegularColumnInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable + { createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))"); - createIndex("CREATE INDEX on %s(a)"); + createIndex("CREATE INDEX ON %s(a)"); execute("INSERT INTO %s (pk, ck, a, b) VALUES (1, 2, 3, 4)"); assertRows(execute("SELECT * FROM %s WHERE a = 3"), row(1, 2, 3, 4)); + if (flushBeforeUpdate) + flush(); + execute("UPDATE %s USING TTL 1 SET b = 10 WHERE pk = 1 AND ck = 2"); Thread.sleep(1000); assertRows(execute("SELECT * FROM %s WHERE a = 3"), row(1, 2, 3, null)); + if (flushBeforeUpdate) + flush(); + execute("UPDATE %s USING TTL 1 SET a = 5 WHERE pk = 1 AND ck = 2"); Thread.sleep(1000); assertEmpty(execute("SELECT * FROM %s WHERE a = 3")); @@ -1275,6 +1380,35 @@ public class SecondaryIndexTest extends CQLTester } @Test + public void testIndexOnRegularColumnOverridingExpiredRow() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(v)"); + + execute("UPDATE %s USING TTL 1 SET v = 3 WHERE pk = 1 AND ck = 2"); + Thread.sleep(1000); + + assertEmpty(execute("SELECT * FROM %s WHERE v = 3")); + + execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2"); + assertRows(execute("SELECT * FROM %s WHERE v = 3"), row(1, 2, 3)); + } + + @Test + public void testIndexOnRegularColumnOverridingDeletedRow() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(v)"); + + execute("INSERT INTO %s(pk, ck, v) VALUES (1, 2, 3)"); + execute("DELETE FROM %s WHERE pk=1 AND ck=2"); + execute("UPDATE %s SET v=3 WHERE pk=1 AND ck=2"); + + assertRows(execute("SELECT * FROM %s WHERE pk=1 AND ck=2"), row(1, 2, 3)); + assertRows(execute("SELECT * FROM %s WHERE v=3"), row(1, 2, 3)); + } + + @Test public void testIndicesOnCompactTable() throws Throwable { assertInvalidMessage("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: v1, v2)", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org