This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 092915a  Don't skip sstables with partition deletes
092915a is described below

commit 092915ad19d25c1ba93f7968210b88fb6e4b9180
Author: Sam Tunnicliffe <s...@beobal.com>
AuthorDate: Tue Apr 7 16:35:50 2020 +0100

    Don't skip sstables with partition deletes
    
    Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15690
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/SinglePartitionReadCommand.java   | 103 +++++++++------------
 .../distributed/test/SimpleReadWriteTest.java      | 101 ++++++++++++++++++++
 3 files changed, 148 insertions(+), 57 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5af0ef3..91b8241 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * Don't skip sstables in slice queries based only on local min/max/deletion 
timestamp (CASSANDRA-15690)
  * Memtable memory allocations may deadlock (CASSANDRA-15367)
  * Run evictFromMembership in GossipStage (CASSANDRA-15592)
 Merged from 2.2:
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 15b74d8..2e014ba 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -705,92 +705,81 @@ public class SinglePartitionReadCommand extends 
ReadCommand
              * We can't eliminate full sstables based on the timestamp of what 
we've already read like
              * in collectTimeOrderedData, but we still want to eliminate 
sstable whose maxTimestamp < mostRecentTombstone
              * we've read. We still rely on the sstable ordering by 
maxTimestamp since if
-             *   maxTimestamp_s1 > maxTimestamp_s0,
+             *   maxTimestamp_s1 < maxTimestamp_s0,
              * we're guaranteed that s1 cannot have a row tombstone such that
              *   timestamp(tombstone) > maxTimestamp_s0
              * since we necessarily have
              *   timestamp(tombstone) <= maxTimestamp_s1
-             * In other words, iterating in maxTimestamp order allow to do our 
mostRecentPartitionTombstone elimination
-             * in one pass, and minimize the number of sstables for which we 
read a partition tombstone.
+             * In other words, iterating in descending maxTimestamp order 
allow to do our mostRecentPartitionTombstone
+             * elimination in one pass, and minimize the number of sstables 
for which we read a partition tombstone.
              */
             Collections.sort(view.sstables, 
SSTableReader.maxTimestampComparator);
-            List<SSTableReader> skippedSSTables = null;
             long mostRecentPartitionTombstone = Long.MIN_VALUE;
-            long minTimestamp = Long.MAX_VALUE;
             int nonIntersectingSSTables = 0;
+            int includedDueToTombstones = 0;
             SSTableReadMetricsCollector metricsCollector = new 
SSTableReadMetricsCollector();
 
             for (SSTableReader sstable : view.sstables)
             {
-                minTimestamp = Math.min(minTimestamp, 
sstable.getMinTimestamp());
                 // if we've already seen a partition tombstone with a 
timestamp greater
                 // than the most recent update to this sstable, we can skip it
                 if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
                     break;
 
-                if (!shouldInclude(sstable))
-                {
-                    nonIntersectingSSTables++;
-                    // sstable contains no tombstone if maxLocalDeletionTime 
== Integer.MAX_VALUE, so we can safely skip those entirely
-                    if (sstable.hasTombstones())
-                    {
-                        if (skippedSSTables == null)
-                            skippedSSTables = new ArrayList<>();
-                        skippedSSTables.add(sstable);
-                    }
-                    continue;
-                }
-
-                if (!sstable.isRepaired())
-                    oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-
-                // 'iter' is added to iterators which is closed on exception, 
or through the closing of the final merged iterator
-                @SuppressWarnings("resource")
-                UnfilteredRowIterator iter = filter.filter(
-                    sstable.iterator(partitionKey(),
-                                     columnFilter(),
-                                     filter.isReversed(),
-                                     isForThrift(),
-                                     metricsCollector)
-                );
-
-                if (isForThrift())
-                    iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
-
-                iterators.add(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.SSTABLE, false));
-
-                mostRecentPartitionTombstone = 
Math.max(mostRecentPartitionTombstone, 
iter.partitionLevelDeletion().markedForDeleteAt());
-            }
-
-            int includedDueToTombstones = 0;
-            // Check for partition tombstones in the skipped sstables
-            if (skippedSSTables != null)
-            {
-                for (SSTableReader sstable : skippedSSTables)
+                if (shouldInclude(sstable))
                 {
-                    if (sstable.getMaxTimestamp() <= minTimestamp)
-                        continue;
+                    if (!sstable.isRepaired())
+                        oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                    @SuppressWarnings("resource") // 'iter' is either closed 
right away, or added to iterators which is close on exception, or through the 
closing of the final merged iterator
+                    // 'iter' is added to iterators which is closed on 
exception, or through the closing of the final merged iterator
+                    @SuppressWarnings("resource")
                     UnfilteredRowIterator iter = 
filter.filter(sstable.iterator(partitionKey(),
                                                                                
 columnFilter(),
                                                                                
 filter.isReversed(),
                                                                                
 isForThrift(),
                                                                                
 metricsCollector));
 
-                    if (iter.partitionLevelDeletion().markedForDeleteAt() > 
minTimestamp)
-                    {
-                        iterators.add(iter);
-                        if (!sstable.isRepaired())
-                            oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-                        includedDueToTombstones++;
-                    }
-                    else
+                    if (isForThrift())
+                        iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
+
+                    iterators.add(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.SSTABLE, false));
+                    mostRecentPartitionTombstone = 
Math.max(mostRecentPartitionTombstone,
+                                                            
iter.partitionLevelDeletion().markedForDeleteAt());
+                }
+                else
+                {
+
+                    nonIntersectingSSTables++;
+                    // sstable contains no tombstone if maxLocalDeletionTime 
== Integer.MAX_VALUE, so we can safely skip those entirely
+                    if (sstable.hasTombstones())
                     {
-                        iter.close();
+                        // 'iter' is added to iterators which is closed on 
exception, or through the closing of the final merged iterator
+                        @SuppressWarnings("resource")
+                        UnfilteredRowIterator iter = 
filter.filter(sstable.iterator(partitionKey(),
+                                                                               
     columnFilter(),
+                                                                               
     filter.isReversed(),
+                                                                               
     isForThrift(),
+                                                                               
     metricsCollector));
+                        // if the sstable contains a partition delete, then we 
must include it regardless of whether it
+                        // shadows any other data seen locally as we can't 
guarantee that other replicas have seen it
+                        if (!iter.partitionLevelDeletion().isLive())
+                        {
+                            includedDueToTombstones++;
+                            iterators.add(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.SSTABLE, false));
+                            if (!sstable.isRepaired())
+                                oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+                            mostRecentPartitionTombstone = 
Math.max(mostRecentPartitionTombstone,
+                                                                    
iter.partitionLevelDeletion().markedForDeleteAt());
+                        }
+                        else
+                        {
+                            iter.close();
+                        }
+
                     }
                 }
             }
+
             if (Tracing.isTracing())
                 Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, 
included {} due to tombstones",
                               nonIntersectingSSTables, view.sstables.size(), 
includedDueToTombstones);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index f1f8674..75e5ba9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -1,12 +1,16 @@
 package org.apache.cassandra.distributed.test;
 
+import java.util.Set;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 import static org.junit.Assert.assertEquals;
 
@@ -269,6 +273,103 @@ public class SimpleReadWriteTest extends 
SharedClusterTestBase
         assertEquals(100, readCount1);
     }
 
+
+    @Test
+    public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY(pk, ck))");
+            // insert a partition tombstone on node 1, the deletion timestamp 
should end up being the sstable's minTimestamp
+            cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl 
USING TIMESTAMP 1 WHERE pk = 0");
+            // and a row from a different partition, to provide the sstable's 
min/max clustering
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
+            cluster.get(1).flush(KEYSPACE);
+            // expect a single sstable, where minTimestamp equals the 
timestamp of the partition delete
+            cluster.get(1).runOnInstance(() -> {
+                Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+                                                      
.getColumnFamilyStore("tbl")
+                                                      .getLiveSSTables();
+                assertEquals(1, sstables.size());
+                assertEquals(1, sstables.iterator().next().getMinTimestamp());
+            });
+
+            // on node 2, add a row for the deleted partition with an older 
timestamp than the deletion so it should be shadowed
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+
+            Object[][] rows = cluster.coordinator(1)
+                                     .execute("SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk=0 AND ck > 5",
+                                              ConsistencyLevel.ALL);
+            assertEquals(0, rows.length);
+        }
+    }
+
+    @Test
+    public void 
skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY(pk, ck))");
+            // insert a partition tombstone on node 1, the deletion timestamp 
should end up being the sstable's minTimestamp
+            cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl 
USING TIMESTAMP 1 WHERE pk = 0");
+            // and a row from a different partition, to provide the sstable's 
min/max clustering
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
+            cluster.get(1).flush(KEYSPACE);
+            // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped 
due to its min/max clusterings. Now we
+            // insert a row which is not shadowed by the partition delete and 
flush to a second sstable. Importantly,
+            // this sstable's minTimestamp is > than the maxTimestamp of the 
first sstable. This would cause the first
+            // sstable not to be reincluded in the merge input, but we can't 
really make that decision as we don't
+            // know what data and/or tombstones are present on other nodes
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+            cluster.get(1).flush(KEYSPACE);
+
+            // on node 2, add a row for the deleted partition with an older 
timestamp than the deletion so it should be shadowed
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+            Object[][] rows = cluster.coordinator(1)
+                                     .execute("SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk=0 AND ck > 5",
+                                              ConsistencyLevel.ALL);
+            // we expect that the row from node 2 (0, 10, 10) was shadowed by 
the partition delete, but the row from
+            // node 1 (0, 6, 6) was not.
+            assertRows(rows, new Object[] {0, 6 ,6});
+        }
+    }
+
+    @Test
+    public void 
skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws 
Throwable
+    {
+        // don't not add skipped sstables back just because the partition 
delete ts is < the local min ts
+
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY(pk, ck))");
+            // insert a partition tombstone on node 1, the deletion timestamp 
should end up being the sstable's minTimestamp
+            cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl 
USING TIMESTAMP 1 WHERE pk = 0");
+            // and a row from a different partition, to provide the sstable's 
min/max clustering
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
+            cluster.get(1).flush(KEYSPACE);
+            // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped 
due to its min/max clusterings. Now we
+            // insert a row which is not shadowed by the partition delete and 
flush to a second sstable. The first sstable
+            // has a maxTimestamp > than the min timestamp of all sstables, so 
it is a candidate for reinclusion to the
+            // merge. Hoever, the second sstable's minTimestamp is > than the 
partition delete. This would  cause the
+            // first sstable not to be reincluded in the merge input, but we 
can't really make that decision as we don't
+            // know what data and/or tombstones are present on other nodes
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+            cluster.get(1).flush(KEYSPACE);
+
+            // on node 2, add a row for the deleted partition with an older 
timestamp than the deletion so it should be shadowed
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+            Object[][] rows = cluster.coordinator(1)
+                                     .execute("SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk=0 AND ck > 5",
+                                              ConsistencyLevel.ALL);
+            // we expect that the row from node 2 (0, 10, 10) was shadowed by 
the partition delete, but the row from
+            // node 1 (0, 6, 6) was not.
+            assertRows(rows, new Object[] {0, 6 ,6});
+        }
+    }
+
     private long readCount(IInvokableInstance instance)
     {
         return instance.callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to