Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 95839aae2 -> 68a67469f


Implement short read protection on partition boundaries

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13595


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68a67469
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68a67469
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68a67469

Branch: refs/heads/cassandra-3.0
Commit: 68a67469f8b25534d086b29b8fe0fa4ec3f9d1ec
Parents: 1efdf33
Author: Aleksey Yeschenko <alek...@yeschenko.com>
Authored: Thu Sep 21 14:29:05 2017 +0100
Committer: Aleksey Yeschenko <alek...@yeschenko.com>
Committed: Sat Sep 30 10:33:30 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/PartitionRangeReadCommand.java |  14 ++
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +
 .../db/SinglePartitionReadCommand.java          |   5 +
 .../apache/cassandra/db/filter/DataLimits.java  |   5 +-
 .../apache/cassandra/service/DataResolver.java  | 216 ++++++++++++++-----
 6 files changed, 180 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c53aa5..c5e54d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Implement short read protection on partition boundaries (CASSANDRA-13595)
  * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries 
(CASSANDRA-13911)
  * Filter header only commit logs before recovery (CASSANDRA-13918)
  * AssertionError prepending to a list (CASSANDRA-13149)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 9e557e0..84e3c7d 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.BaseRowIterator;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -400,6 +401,19 @@ public class PartitionRangeReadCommand extends ReadCommand
         return DataRange.serializer.serializedSize(dataRange(), version, 
metadata());
     }
 
+    /*
+     * We are currently using PartitionRangeReadCommand for most index 
queries, even if they are explicitly restricted
+     * to a single partition key. Return true if that is the case.
+     *
+     * See CASSANDRA-11617 and CASSANDRA-11872 for details.
+     */
+    public boolean isLimitedToOnePartition()
+    {
+        return dataRange.keyRange instanceof Bounds
+            && dataRange.startKey().kind() == PartitionPosition.Kind.ROW_KEY
+            && dataRange.startKey().equals(dataRange.stopKey());
+    }
+
     private static class Deserializer extends SelectionDeserializer
     {
         public ReadCommand deserialize(DataInputPlus in,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 160b104..2d399d8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -170,6 +170,8 @@ public abstract class ReadCommand implements ReadQuery
     protected abstract void serializeSelection(DataOutputPlus out, int 
version) throws IOException;
     protected abstract long selectionSerializedSize(int version);
 
+    public abstract boolean isLimitedToOnePartition();
+
     /**
      * The metadata for the table queried.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7a66eca..4b10530 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -1096,6 +1096,11 @@ public class SinglePartitionReadCommand extends 
ReadCommand
              + 
ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), 
version);
     }
 
+    public boolean isLimitedToOnePartition()
+    {
+        return true;
+    }
+
     /**
      * Groups multiple single partition read commands.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java 
b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 6b74293..4c57a76 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -312,10 +312,7 @@ public abstract class DataLimits
 
         public DataLimits forShortReadRetry(int toFetch)
         {
-            // When we do a short read retry, we're only ever querying the 
single partition on which we have a short read. So
-            // we use toFetch as the row limit and use no perPartitionLimit 
(it would be equivalent in practice to use toFetch
-            // for both argument or just for perPartitionLimit with no limit 
on rowLimit).
-            return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
+            return new CQLLimits(toFetch, perPartitionLimit, isDistinct);
         }
 
         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 7d8ffc5..5fb34c6 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -34,6 +34,9 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -91,7 +94,7 @@ public class DataResolver extends ResponseResolver
          * have more rows than the client requested. To make sure that we 
still conform to the original limit,
          * we apply a top-level post-reconciliation counter to the merged 
partition iterator.
          *
-         * Short read protection logic (ShortReadRowProtection.moreContents()) 
relies on this counter to be applied
+         * Short read protection logic 
(ShortReadRowsProtection.moreContents()) relies on this counter to be applied
          * to the current partition to work. For this reason we have to apply 
the counter transformation before
          * empty partition discard logic kicks in - for it will eagerly 
consume the iterator.
          *
@@ -121,26 +124,13 @@ public class DataResolver extends ResponseResolver
         if (results.size() == 1)
             return results.get(0);
 
-        // So-called "short reads" stems from nodes returning only a subset of 
the results they have for a partition due to the limit,
-        // but that subset not being enough post-reconciliation. So if we 
don't have limit, don't bother.
+        /*
+         * So-called short reads stems from nodes returning only a subset of 
the results they have due to the limit,
+         * but that subset not being enough post-reconciliation. So if we 
don't have a limit, don't bother.
+         */
         if (!command.limits().isUnlimited())
-        {
             for (int i = 0; i < results.size(); i++)
-            {
-                DataLimits.Counter singleResultCounter =
-                    command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
-
-                ShortReadResponseProtection protection =
-                    new ShortReadResponseProtection(sources[i], 
singleResultCounter, mergedResultCounter);
-
-                /*
-                 * The order of transformations is important here. See 
ShortReadResponseProtection.applyToPartition()
-                 * comments for details. We want 
singleResultCounter.applyToPartition() to be called after SRRP applies
-                 * its transformations, so that this order is preserved when 
calling applyToRows() too.
-                 */
-                results.set(i, 
Transformation.apply(Transformation.apply(results.get(i), protection), 
singleResultCounter));
-            }
-        }
+                results.set(i, extendWithShortReadProtection(results.get(i), 
sources[i], mergedResultCounter));
 
         return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
new RepairMergeListener(sources));
     }
@@ -476,14 +466,60 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private class ShortReadResponseProtection extends 
Transformation<UnfilteredRowIterator>
+    private UnfilteredPartitionIterator 
extendWithShortReadProtection(UnfilteredPartitionIterator partitions,
+                                                                      
InetAddress source,
+                                                                      
DataLimits.Counter mergedResultCounter)
+    {
+        DataLimits.Counter singleResultCounter =
+            command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
+
+        ShortReadPartitionsProtection protection =
+            new ShortReadPartitionsProtection(source, singleResultCounter, 
mergedResultCounter);
+
+        /*
+         * The order of extention and transformations is important here. 
Extending with more partitions has to happen
+         * first due to the way BaseIterator.hasMoreContents() works: only 
transformations applied after extension will
+         * be called on the first partition of the extended iterator.
+         *
+         * Additionally, we want singleResultCounter to be applied after SRPP, 
so that its applyToPartition() method will
+         * be called last, after the extension done by SRRP.applyToPartition() 
call. That way we preserve the same order
+         * when it comes to calling SRRP.moreContents() and applyToRow() 
callbacks.
+         *
+         * See ShortReadPartitionsProtection.applyToPartition() for more 
details.
+         */
+
+        // extend with moreContents() only if it's a range read command with 
no partition key specified
+        if (!command.isLimitedToOnePartition())
+            partitions = MorePartitions.extend(partitions, protection);     // 
register SRPP.moreContents()
+
+        partitions = Transformation.apply(partitions, protection);          // 
register SRPP.applyToPartition()
+        partitions = Transformation.apply(partitions, singleResultCounter); // 
register the per-source counter
+
+        return partitions;
+    }
+
+    /*
+     * We have a potential short read if the result from a given node contains 
the requested number of rows
+     * (i.e. it has stopped returning results due to the limit), but some of 
them haven't
+     * made it into the final post-reconciliation result due to other nodes' 
row, range, and/or partition tombstones.
+     *
+     * If that is the case, then that node may have more rows that we should 
fetch, as otherwise we could
+     * ultimately return fewer rows than required. Also, those additional rows 
may contain tombstones which
+     * which we also need to fetch as they may shadow rows or partitions from 
other replicas' results, which we would
+     * otherwise return incorrectly.
+     */
+    private class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowIterator> implements 
MorePartitions<UnfilteredPartitionIterator>
     {
         private final InetAddress source;
 
         private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
         private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
 
-        private ShortReadResponseProtection(InetAddress source, 
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
+        private DecoratedKey lastPartitionKey; // key of the last observed 
partition
+
+        private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
+
+        private ShortReadPartitionsProtection(InetAddress source, 
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
         {
             this.source = source;
             this.singleResultCounter = singleResultCounter;
@@ -493,29 +529,100 @@ public class DataResolver extends ResponseResolver
         @Override
         public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
         {
-            ShortReadRowProtection protection = new 
ShortReadRowProtection(partition.metadata(), partition.partitionKey());
+            partitionsFetched = true;
+
+            lastPartitionKey = partition.partitionKey();
 
             /*
-             * Extend for moreContents() then apply protection to track 
lastClustering.
+             * Extend for moreContents() then apply protection to track 
lastClustering by applyToRow().
              *
              * If we don't apply the transformation *after* extending the 
partition with MoreRows,
              * applyToRow() method of protection will not be called on the 
first row of the new extension iterator.
              */
+            ShortReadRowsProtection protection = new 
ShortReadRowsProtection(partition.metadata(), partition.partitionKey());
             return Transformation.apply(MoreRows.extend(partition, 
protection), protection);
         }
 
-        private class ShortReadRowProtection extends Transformation implements 
MoreRows<UnfilteredRowIterator>
+        /*
+         * We only get here once all the rows and partitions in this iterator 
have been iterated over, and so
+         * if the node had returned the requested number of rows but we still 
get here, then some results were
+         * skipped during reconciliation.
+         */
+        public UnfilteredPartitionIterator moreContents()
+        {
+            // never try to request additional partitions from replicas if our 
reconciled partitions are already filled to the limit
+            assert !mergedResultCounter.isDone();
+
+            // we do not apply short read protection when we have no limits at 
all
+            assert !command.limits().isUnlimited();
+
+            /*
+             * If this is a single partition read command or an (indexed) 
partition range read command with
+             * a partition key specified, then we can't and shouldn't try 
fetch more partitions.
+             */
+            assert !command.isLimitedToOnePartition();
+
+            /*
+             * If the returned result doesn't have enough rows/partitions to 
satisfy even the original limit, don't ask for more.
+             *
+             * Can only take the short cut if there is no per partition limit 
set. Otherwise it's possible to hit false
+             * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
+             */
+            if (!singleResultCounter.isDone() && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
+                return null;
+
+            /*
+             * Either we had an empty iterator as the initial response, or our 
moreContents() call got us an empty iterator.
+             * There is no point to ask the replica for more rows - it has no 
more in the requested range.
+             */
+            if (!partitionsFetched)
+                return null;
+            partitionsFetched = false;
+
+            /*
+             * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
+             * The row limit will either be set to the per partition limit - 
if the command has no total row limit set, or
+             * the total # of rows remaining - if it has some. If we don't 
grab enough rows in some of the partitions,
+             * then future ShortReadRowsProtection.moreContents() calls will 
fetch the missing ones.
+             */
+            int toQuery = command.limits().count() != DataLimits.NO_LIMIT
+                        ? command.limits().count() - 
mergedResultCounter.counted()
+                        : command.limits().perPartitionCount();
+
+            
ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
+            Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
+
+            PartitionRangeReadCommand cmd = 
makeFetchAdditionalPartitionReadCommand(toQuery);
+            return executeReadCommand(cmd);
+        }
+
+        private PartitionRangeReadCommand 
makeFetchAdditionalPartitionReadCommand(int toQuery)
+        {
+            PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) 
command;
+
+            DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
+
+            AbstractBounds<PartitionPosition> bounds = 
cmd.dataRange().keyRange();
+            AbstractBounds<PartitionPosition> newBounds = 
bounds.inclusiveRight()
+                                                        ? new 
Range<>(lastPartitionKey, bounds.right)
+                                                        : new 
ExcludingBounds<>(lastPartitionKey, bounds.right);
+            DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
+
+            return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+        }
+
+        private class ShortReadRowsProtection extends Transformation 
implements MoreRows<UnfilteredRowIterator>
         {
             private final CFMetaData metadata;
             private final DecoratedKey partitionKey;
 
-            private Clustering lastClustering;
+            private Clustering lastClustering; // clustering of the last 
observed row
 
             private int lastCounted = 0; // last seen recorded # before 
attempting to fetch more rows
             private int lastFetched = 0; // # rows returned by last attempt to 
get more (or by the original read command)
             private int lastQueried = 0; // # extra rows requested from the 
replica last time
 
-            private ShortReadRowProtection(CFMetaData metadata, DecoratedKey 
partitionKey)
+            private ShortReadRowsProtection(CFMetaData metadata, DecoratedKey 
partitionKey)
             {
                 this.metadata = metadata;
                 this.partitionKey = partitionKey;
@@ -529,18 +636,9 @@ public class DataResolver extends ResponseResolver
             }
 
             /*
-             * We have a potential short read if the result from a given node 
contains the requested number of rows
-             * for that partition (i.e. it has stopped returning results due 
to the limit), but some of them haven't
-             * made it into the final post-reconciliation result due to other 
nodes' tombstones.
-             *
-             * If that is the case, then that node may have more rows that we 
should fetch, as otherwise we could
-             * ultimately return fewer rows than required. Also, those 
additional rows may contain tombstones which
-             * which we also need to fetch as they may shadow rows from other 
replicas' results, which we would
-             * otherwise return incorrectly.
-             *
-             * Also note that we only get here once all the rows for this 
partition have been iterated over, and so
-             * if the node had returned the requested number of rows but we 
still get here, then some results were
-             * skipped during reconciliation.
+             * We only get here once all the rows in this iterator have been 
iterated over, and so if the node
+             * had returned the requested number of rows but we still get 
here, then some results were skipped
+             * during reconciliation.
              */
             public UnfilteredRowIterator moreContents()
             {
@@ -622,18 +720,17 @@ public class DataResolver extends ResponseResolver
                  * Note: it's ok to retrieve more rows that necessary since 
singleResultCounter is not stopping and only
                  * counts.
                  *
-                 * With that in mind, we'll just request the minimum of 
(count(), perPartitionCount()) limits,
-                 * but no fewer than 8 rows (an arbitrary round lower bound), 
to ensure that we won't fetch row by row
-                 * for SELECT DISTINCT queries (that set per partition limit 
to 1).
+                 * With that in mind, we'll just request the minimum of 
(count(), perPartitionCount()) limits.
                  *
                  * See CASSANDRA-13794 for more details.
                  */
-                lastQueried = Math.max(Math.min(command.limits().count(), 
command.limits().perPartitionCount()), 8);
+                lastQueried = Math.min(command.limits().count(), 
command.limits().perPartitionCount());
 
                 
ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
                 Tracing.trace("Requesting {} extra rows from {} for short read 
protection", lastQueried, source);
 
-                return 
executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
+                SinglePartitionReadCommand cmd = 
makeFetchAdditionalRowsReadCommand(lastQueried);
+                return 
UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);
             }
 
             private SinglePartitionReadCommand 
makeFetchAdditionalRowsReadCommand(int toQuery)
@@ -649,24 +746,25 @@ public class DataResolver extends ResponseResolver
                                                          command.rowFilter(),
                                                          
command.limits().forShortReadRetry(toQuery),
                                                          partitionKey,
-                                                         filter);
+                                                         filter,
+                                                         
command.indexMetadata());
             }
+        }
 
-            private UnfilteredRowIterator 
executeReadCommand(SinglePartitionReadCommand cmd)
-            {
-                DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1);
-                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
-
-                if (StorageProxy.canDoLocalRequest(source))
-                    
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
-                else
-                    
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
 source, handler);
-
-                // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
-                handler.awaitResults();
-                assert resolver.responses.size() == 1;
-                return 
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command),
 cmd);
-            }
+        private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+        {
+            DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1);
+            ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
+            if (StorageProxy.canDoLocalRequest(source))
+                StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
+            else
+                
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
 source, handler);
+
+            // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
+            handler.awaitResults();
+            assert resolver.responses.size() == 1;
+            return resolver.responses.get(0).payload.makeIterator(command);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to