Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 ab5084a52 -> f93e6e340
  refs/heads/cassandra-3.11 c1efaf3a7 -> 2bae4ca90
  refs/heads/trunk 4809f4275 -> 030ec1f05


Fix short read protection performance

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13794


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f93e6e34
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f93e6e34
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f93e6e34

Branch: refs/heads/cassandra-3.0
Commit: f93e6e3401c343dec74687d8b079b5697813ab28
Parents: ab5084a
Author: Aleksey Yeschenko <alek...@yeschenko.com>
Authored: Thu Aug 31 20:51:08 2017 +0100
Committer: Aleksey Yeschenko <alek...@yeschenko.com>
Committed: Wed Sep 20 16:11:18 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +
 .../apache/cassandra/service/DataResolver.java  | 272 ++++++++++++-------
 3 files changed, 181 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2d11a3e..07742ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Improve short read protection performance (CASSANDRA-13794)
  * Fix sstable reader to support range-tombstone-marker for multi-slices 
(CASSANDRA-13787)
  * Fix short read protection for tables with no clustering columns 
(CASSANDRA-13880)
  * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 983d6b1..e6e46b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2470,4 +2470,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         return keyspace.getColumnFamilyStore(id);
     }
+
+    public static TableMetrics metricsFor(UUID tableId)
+    {
+        return getIfExists(tableId).metric;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 99399a3..9a98ee5 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,7 +44,7 @@ public class DataResolver extends ResponseResolver
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
 
-    public DataResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
+    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount)
     {
         super(keyspace, command, consistency, maxResponseCount);
     }
@@ -55,6 +55,20 @@ public class DataResolver extends ResponseResolver
         return 
UnfilteredPartitionIterators.filter(response.makeIterator(command), 
command.nowInSec());
     }
 
+    public boolean isDataPresent()
+    {
+        return !responses.isEmpty();
+    }
+
+    public void compareResponses()
+    {
+        // We need to fully consume the results to trigger read repairs if 
appropriate
+        try (PartitionIterator iterator = resolve())
+        {
+            PartitionIterators.consume(iterator);
+        }
+    }
+
     public PartitionIterator resolve()
     {
         // We could get more responses while this method runs, which is ok 
(we're happy to ignore any response not here
@@ -83,54 +97,56 @@ public class DataResolver extends ResponseResolver
          * See CASSANDRA-13747 for more details.
          */
 
-        DataLimits.Counter counter = 
command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition());
+        DataLimits.Counter mergedResultCounter =
+            command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition());
 
-        UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, counter);
-        FilteredPartitions filtered = FilteredPartitions.filter(merged,
-                                                                new 
Filter(command.nowInSec(),
-                                                                           
command.metadata().enforceStrictLiveness()));
-        PartitionIterator counted = counter.applyTo(filtered);
+        UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+        FilteredPartitions filtered =
+            FilteredPartitions.filter(merged, new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness()));
+        PartitionIterator counted = Transformation.apply(filtered, 
mergedResultCounter);
 
         return command.isForThrift()
              ? counted
              : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
-    public void compareResponses()
-    {
-        // We need to fully consume the results to trigger read repairs if 
appropriate
-        try (PartitionIterator iterator = resolve())
-        {
-            PartitionIterators.consume(iterator);
-        }
-    }
-
     private UnfilteredPartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
                                                                      
InetAddress[] sources,
-                                                                     
DataLimits.Counter resultCounter)
+                                                                     
DataLimits.Counter mergedResultCounter)
     {
         // If we have only one results, there is no read repair to do and we 
can't get short reads
         if (results.size() == 1)
             return results.get(0);
 
-        UnfilteredPartitionIterators.MergeListener listener = new 
RepairMergeListener(sources);
-
         // So-called "short reads" stems from nodes returning only a subset of 
the results they have for a partition due to the limit,
         // but that subset not being enough post-reconciliation. So if we 
don't have limit, don't bother.
         if (!command.limits().isUnlimited())
         {
             for (int i = 0; i < results.size(); i++)
-                results.set(i, Transformation.apply(results.get(i), new 
ShortReadProtection(sources[i], resultCounter)));
+            {
+                DataLimits.Counter singleResultCounter =
+                    command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition()).onlyCount();
+
+                ShortReadResponseProtection protection =
+                    new ShortReadResponseProtection(sources[i], 
singleResultCounter, mergedResultCounter);
+
+                /*
+                 * The order of transformations is important here. See 
ShortReadResponseProtection.applyToPartition()
+                 * comments for details. We want 
singleResultCounter.applyToPartition() to be called after SRRP applies
+                 * its transformations, so that this order is preserved when 
calling applyToRows() too.
+                 */
+                results.set(i, 
Transformation.apply(Transformation.apply(results.get(i), protection), 
singleResultCounter));
+            }
         }
 
-        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
listener);
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
new RepairMergeListener(sources));
     }
 
     private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
     {
         private final InetAddress[] sources;
 
-        public RepairMergeListener(InetAddress[] sources)
+        private RepairMergeListener(InetAddress[] sources)
         {
             this.sources = sources;
         }
@@ -209,7 +225,7 @@ public class DataResolver extends ResponseResolver
             // For each source, record if there is an open range to send as 
repair, and from where.
             private final Slice.Bound[] markerToRepair = new 
Slice.Bound[sources.length];
 
-            public MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
+            private MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
             {
                 this.partitionKey = partitionKey;
                 this.columns = columns;
@@ -457,17 +473,18 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private class ShortReadProtection extends 
Transformation<UnfilteredRowIterator>
+    private class ShortReadResponseProtection extends 
Transformation<UnfilteredRowIterator>
     {
         private final InetAddress source;
-        private final DataLimits.Counter counter;
-        private final DataLimits.Counter postReconciliationCounter;
 
-        private ShortReadProtection(InetAddress source, DataLimits.Counter 
postReconciliationCounter)
+        private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
+        private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
+
+        private ShortReadResponseProtection(InetAddress source, 
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
         {
             this.source = source;
-            this.counter = command.limits().newCounter(command.nowInSec(), 
false, command.selectsFullPartition()).onlyCount();
-            this.postReconciliationCounter = postReconciliationCounter;
+            this.singleResultCounter = singleResultCounter;
+            this.mergedResultCounter = mergedResultCounter;
         }
 
         @Override
@@ -475,24 +492,25 @@ public class DataResolver extends ResponseResolver
         {
             ShortReadRowProtection protection = new 
ShortReadRowProtection(partition.metadata(), partition.partitionKey());
 
-            partition = MoreRows.extend(partition, protection); // enable 
moreContents()
-
             /*
-             * if we don't apply these transformations *after* extending the 
partition with MoreRows,
-             * their applyToRow() method will not be called on the first row 
of the new extension iterator
+             * Extend for moreContents() then apply protection to track 
lastClustering.
+             *
+             * If we don't apply the transformation *after* extending the 
partition with MoreRows,
+             * applyToRow() method of protection will not be called on the 
first row of the new extension iterator.
              */
-            partition = Transformation.apply(partition, protection); // track 
lastClustering
-            partition = Transformation.apply(partition, counter);    // do the 
counting
-
-            return partition;
+            return Transformation.apply(MoreRows.extend(partition, 
protection), protection);
         }
 
         private class ShortReadRowProtection extends Transformation implements 
MoreRows<UnfilteredRowIterator>
         {
-            final CFMetaData metadata;
-            final DecoratedKey partitionKey;
-            Clustering lastClustering;
-            int lastCount = 0;
+            private final CFMetaData metadata;
+            private final DecoratedKey partitionKey;
+
+            private Clustering lastClustering;
+
+            private int lastCounted = 0; // last seen recorded # before 
attempting to fetch more rows
+            private int lastFetched = 0; // # rows returned by last attempt to 
get more (or by the original read command)
+            private int lastQueried = 0; // # extra rows requested from the 
replica last time
 
             private ShortReadRowProtection(CFMetaData metadata, DecoratedKey 
partitionKey)
             {
@@ -507,79 +525,139 @@ public class DataResolver extends ResponseResolver
                 return row;
             }
 
-            @Override
+            /*
+             * We have a potential short read if the result from a given node 
contains the requested number of rows
+             * for that partition (i.e. it has stopped returning results due 
to the limit), but some of them haven't
+             * made it into the final post-reconciliation result due to other 
nodes' tombstones.
+             *
+             * If that is the case, then that node may have more rows that we 
should fetch, as otherwise we could
+             * ultimately return fewer rows than required. Also, those 
additional rows may contain tombstones which
+             * which we also need to fetch as they may shadow rows from other 
replicas' results, which we would
+             * otherwise return incorrectly.
+             *
+             * Also note that we only get here once all the rows for this 
partition have been iterated over, and so
+             * if the node had returned the requested number of rows but we 
still get here, then some results were
+             * skipped during reconciliation.
+             */
             public UnfilteredRowIterator moreContents()
             {
-                assert !postReconciliationCounter.isDoneForPartition();
-
-                // We have a short read if the node this is the result of has 
returned the requested number of
-                // rows for that partition (i.e. it has stopped returning 
results due to the limit), but some of
-                // those results haven't made it in the final result 
post-reconciliation due to other nodes
-                // tombstones. If that is the case, then the node might have 
more results that we should fetch
-                // as otherwise we might return less results than required, or 
results that shouldn't be returned
-                // (because the node has tombstone that hides future results 
from other nodes but that haven't
-                // been returned due to the limit).
-                // Also note that we only get here once all the results for 
this node have been returned, and so
-                // if the node had returned the requested number but we still 
get there, it imply some results were
-                // skipped during reconciliation.
-                if (lastCount == counter.counted() || 
!counter.isDoneForPartition())
+                // never try to request additional rows from replicas if our 
reconciled partition is already filled to the limit
+                assert !mergedResultCounter.isDoneForPartition();
+
+                // we do not apply short read protection when we have no 
limits at all
+                assert !command.limits().isUnlimited();
+
+                // if the returned partition doesn't have enough rows to 
satisfy even the original limit, don't ask for more
+                if (!singleResultCounter.isDoneForPartition())
+                    return null;
+
+                /*
+                 * If the replica has no live rows in the partition, don't try 
to fetch more.
+                 *
+                 * Note that the previous branch [if 
(!singleResultCounter.isDoneForPartition()) return null] doesn't
+                 * always cover this scenario:
+                 * isDoneForPartition() is defined as [isDone() || 
rowInCurrentPartition >= perPartitionLimit],
+                 * and will return true if isDone() returns true, even if 
there are 0 rows counted in the current partition.
+                 *
+                 * This can happen with a range read if after 1+ rounds of 
short read protection requests we managed to fetch
+                 * enough extra rows for other partitions to satisfy the 
singleResultCounter's total row limit, but only
+                 * have tombstones in the current partition.
+                 *
+                 * One other way we can hit this condition is when the 
partition only has a live static row and no regular
+                 * rows. In that scenario the counter will remain at 0 until 
the partition is closed - which happens after
+                 * the moreContents() call.
+                 */
+                if (singleResultCounter.countedInCurrentPartition() == 0)
                     return null;
 
-                // clustering of the last row returned is empty, meaning that 
there is only one row per partition,
-                // and we already have it.
-                if (lastClustering == Clustering.EMPTY)
+                /*
+                 * This is a table with no clustering columns, and has at most 
one row per partition - with EMPTY clustering.
+                 * We already have the row, so there is no point in asking for 
more from the partition.
+                 */
+                if (Clustering.EMPTY == lastClustering)
                     return null;
 
-                lastCount = counter.counted();
-
-                // We need to try to query enough additional results to 
fulfill our query, but because we could still
-                // get short reads on that additional query, just querying the 
number of results we miss may not be
-                // enough. But we know that when this node answered n rows 
(counter.countedInCurrentPartition), only
-                // x rows 
(postReconciliationCounter.countedInCurrentPartition()) made it in the final 
result.
-                // So our ratio of live rows to requested rows is x/n, so 
since we miss n-x rows, we estimate that
-                // we should request m rows so that m * x/n = n-x, that is m = 
(n^2/x) - n.
-                // Also note that it's ok if we retrieve more results that 
necessary since our top level iterator is a
-                // counting iterator.
-                int n = postReconciliationCounter.countedInCurrentPartition();
-                int x = counter.countedInCurrentPartition();
-                int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
-
-                DataLimits retryLimits = 
command.limits().forShortReadRetry(toQuery);
+                lastFetched = singleResultCounter.countedInCurrentPartition() 
- lastCounted;
+                lastCounted = singleResultCounter.countedInCurrentPartition();
+
+                // getting back fewer rows than we asked for means the 
partition on the replica has been fully consumed
+                if (lastQueried > 0 && lastFetched < lastQueried)
+                    return null;
+
+                /*
+                 * At this point we know that:
+                 *     1. the replica returned [repeatedly?] as many rows as 
we asked for and potentially has more
+                 *        rows in the partition
+                 *     2. at least one of those returned rows was shadowed by 
a tombstone returned from another
+                 *        replica
+                 *     3. we haven't satisfied the client's limits yet, and 
should attempt to query for more rows to
+                 *        avoid a short read
+                 *
+                 * In the ideal scenario, we would get exactly min(a, b) or 
fewer rows from the next request, where a and b
+                 * are defined as follows:
+                 *     [a] limits.count() - mergedResultCounter.counted()
+                 *     [b] limits.perPartitionCount() - 
mergedResultCounter.countedInCurrentPartition()
+                 *
+                 * It would be naive to query for exactly that many rows, as 
it's possible and not unlikely
+                 * that some of the returned rows would also be shadowed by 
tombstones from other hosts.
+                 *
+                 * Note: we don't know, nor do we care, how many rows from the 
replica made it into the reconciled result;
+                 * we can only tell how many in total we queried for, and that 
[0, mrc.countedInCurrentPartition()) made it.
+                 *
+                 * In general, our goal should be to minimise the number of 
extra requests - *not* to minimise the number
+                 * of rows fetched: there is a high transactional cost for 
every individual request, but a relatively low
+                 * marginal cost for each extra row requested.
+                 *
+                 * As such it's better to overfetch than to underfetch extra 
rows from a host; but at the same
+                 * time we want to respect paging limits and not blow up 
spectacularly.
+                 *
+                 * Note: it's ok to retrieve more rows that necessary since 
singleResultCounter is not stopping and only
+                 * counts.
+                 *
+                 * With that in mind, we'll just request the minimum of 
(count(), perPartitionCount()) limits,
+                 * but no fewer than 8 rows (an arbitrary round lower bound), 
to ensure that we won't fetch row by row
+                 * for SELECT DISTINCT queries (that set per partition limit 
to 1).
+                 *
+                 * See CASSANDRA-13794 for more details.
+                 */
+                lastQueried = Math.max(Math.min(command.limits().count(), 
command.limits().perPartitionCount()), 8);
+
+                
ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
+                Tracing.trace("Requesting {} extra rows from {} for short read 
protection", lastQueried, source);
+
+                return 
executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
+            }
+
+            private SinglePartitionReadCommand 
makeFetchAdditionalRowsReadCommand(int toQuery)
+            {
                 ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
-                ClusteringIndexFilter retryFilter = lastClustering == null ? 
filter : filter.forPaging(metadata.comparator, lastClustering, false);
-                SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
-                                                                               
    command.nowInSec(),
-                                                                               
    command.columnFilter(),
-                                                                               
    command.rowFilter(),
-                                                                               
    retryLimits,
-                                                                               
    partitionKey,
-                                                                               
    retryFilter);
-
-                Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
-                
Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
-
-                return doShortReadRetry(cmd);
+                if (null != lastClustering)
+                    filter = filter.forPaging(metadata.comparator, 
lastClustering, false);
+
+                return SinglePartitionReadCommand.create(command.metadata(),
+                                                         command.nowInSec(),
+                                                         
command.columnFilter(),
+                                                         command.rowFilter(),
+                                                         
command.limits().forShortReadRetry(toQuery),
+                                                         partitionKey,
+                                                         filter);
             }
 
-            private UnfilteredRowIterator 
doShortReadRetry(SinglePartitionReadCommand retryCommand)
+            private UnfilteredRowIterator 
executeReadCommand(SinglePartitionReadCommand cmd)
             {
-                DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1);
-                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+                DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1);
+                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
                 if (StorageProxy.canDoLocalRequest(source))
-                    
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
+                    
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
                 else
-                    
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
 source, handler);
+                    
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
 source, handler);
 
                 // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return 
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command),
 retryCommand);
+                return 
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command),
 cmd);
             }
         }
     }
-
-    public boolean isDataPresent()
-    {
-        return !responses.isEmpty();
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to