Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bae4ca9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bae4ca9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bae4ca9

Branch: refs/heads/trunk
Commit: 2bae4ca907ac4d2ab53c899e5cf5c9e4de631f52
Parents: c1efaf3 f93e6e3
Author: Aleksey Yeschenko <alek...@yeschenko.com>
Authored: Wed Sep 20 17:41:07 2017 +0100
Committer: Aleksey Yeschenko <alek...@yeschenko.com>
Committed: Wed Sep 20 17:41:07 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +
 .../apache/cassandra/service/DataResolver.java  | 304 +++++++++++--------
 3 files changed, 187 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 39270e5,07742ef..8d07cbc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,5 +1,16 @@@
 -3.0.15
 +3.11.1
 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is 
a single leaf and overflow collisions (CASSANDRA-13869)
 + * Add a compaction option to TWCS to ignore sstables overlapping checks 
(CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to 
correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection 
happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range 
movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation 
(CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Improve short read protection performance (CASSANDRA-13794)
   * Fix sstable reader to support range-tombstone-marker for multi-slices 
(CASSANDRA-13787)
   * Fix short read protection for tables with no clustering columns 
(CASSANDRA-13880)
   * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 4b0bd3c,9a98ee5..32fc015
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -43,12 -43,10 +43,12 @@@ public class DataResolver extends Respo
  {
      @VisibleForTesting
      final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
 +    private final long queryStartNanoTime;
  
-     public DataResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount)
++    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
      }
  
      public PartitionIterator getData()
@@@ -122,10 -123,23 +125,23 @@@
          if (!command.limits().isUnlimited())
          {
              for (int i = 0; i < results.size(); i++)
-                 results.set(i, Transformation.apply(results.get(i), new 
ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
+             {
+                 DataLimits.Counter singleResultCounter =
+                     command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition()).onlyCount();
+ 
+                 ShortReadResponseProtection protection =
 -                    new ShortReadResponseProtection(sources[i], 
singleResultCounter, mergedResultCounter);
++                    new ShortReadResponseProtection(sources[i], 
singleResultCounter, mergedResultCounter, queryStartNanoTime);
+ 
+                 /*
+                  * The order of transformations is important here. See 
ShortReadResponseProtection.applyToPartition()
+                  * comments for details. We want 
singleResultCounter.applyToPartition() to be called after SRRP applies
+                  * its transformations, so that this order is preserved when 
calling applyToRows() too.
+                  */
+                 results.set(i, 
Transformation.apply(Transformation.apply(results.get(i), protection), 
singleResultCounter));
+             }
          }
  
-         return UnfilteredPartitionIterators.merge(results, 
command.nowInSec(), listener);
+         return UnfilteredPartitionIterators.merge(results, 
command.nowInSec(), new RepairMergeListener(sources));
      }
  
      private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
@@@ -209,9 -223,9 +225,9 @@@
              // For each source, the time of the current deletion as known by 
the source.
              private final DeletionTime[] sourceDeletionTime = new 
DeletionTime[sources.length];
              // For each source, record if there is an open range to send as 
repair, and from where.
 -            private final Slice.Bound[] markerToRepair = new 
Slice.Bound[sources.length];
 +            private final ClusteringBound[] markerToRepair = new 
ClusteringBound[sources.length];
  
-             public MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
+             private MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
              {
                  this.partitionKey = partitionKey;
                  this.columns = columns;
@@@ -471,19 -473,18 +487,24 @@@
          }
      }
  
-     private class ShortReadProtection extends 
Transformation<UnfilteredRowIterator>
+     private class ShortReadResponseProtection extends 
Transformation<UnfilteredRowIterator>
      {
          private final InetAddress source;
-         private final DataLimits.Counter counter;
-         private final DataLimits.Counter postReconciliationCounter;
+ 
+         private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
+         private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
+ 
 -        private ShortReadResponseProtection(InetAddress source, 
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
 +        private final long queryStartNanoTime;
 +
-         private ShortReadProtection(InetAddress source, DataLimits.Counter 
postReconciliationCounter, long queryStartNanoTime)
++        private ShortReadResponseProtection(InetAddress source,
++                                            DataLimits.Counter 
singleResultCounter,
++                                            DataLimits.Counter 
mergedResultCounter,
++                                            long queryStartNanoTime)
          {
              this.source = source;
-             this.counter = command.limits().newCounter(command.nowInSec(), 
false, command.selectsFullPartition()).onlyCount();
-             this.postReconciliationCounter = postReconciliationCounter;
+             this.singleResultCounter = singleResultCounter;
+             this.mergedResultCounter = mergedResultCounter;
 +            this.queryStartNanoTime = queryStartNanoTime;
          }
  
          @Override
@@@ -523,100 -525,133 +545,141 @@@
                  return row;
              }
  
-             @Override
+             /*
+              * We have a potential short read if the result from a given node 
contains the requested number of rows
+              * for that partition (i.e. it has stopped returning results due 
to the limit), but some of them haven't
+              * made it into the final post-reconciliation result due to other 
nodes' tombstones.
+              *
+              * If that is the case, then that node may have more rows that we 
should fetch, as otherwise we could
+              * ultimately return fewer rows than required. Also, those 
additional rows may contain tombstones which
+              * which we also need to fetch as they may shadow rows from other 
replicas' results, which we would
+              * otherwise return incorrectly.
+              *
+              * Also note that we only get here once all the rows for this 
partition have been iterated over, and so
+              * if the node had returned the requested number of rows but we 
still get here, then some results were
+              * skipped during reconciliation.
+              */
              public UnfilteredRowIterator moreContents()
              {
-                 assert !postReconciliationCounter.isDoneForPartition();
- 
-                 // We have a short read if the node this is the result of has 
returned the requested number of
-                 // rows for that partition (i.e. it has stopped returning 
results due to the limit), but some of
-                 // those results haven't made it in the final result 
post-reconciliation due to other nodes
-                 // tombstones. If that is the case, then the node might have 
more results that we should fetch
-                 // as otherwise we might return less results than required, 
or results that shouldn't be returned
-                 // (because the node has tombstone that hides future results 
from other nodes but that haven't
-                 // been returned due to the limit).
-                 // Also note that we only get here once all the results for 
this node have been returned, and so
-                 // if the node had returned the requested number but we still 
get there, it imply some results were
-                 // skipped during reconciliation.
-                 if (lastCount == counted(counter) || 
!counter.isDoneForPartition())
+                 // never try to request additional rows from replicas if our 
reconciled partition is already filled to the limit
+                 assert !mergedResultCounter.isDoneForPartition();
+ 
+                 // we do not apply short read protection when we have no 
limits at all
+                 assert !command.limits().isUnlimited();
+ 
+                 // if the returned partition doesn't have enough rows to 
satisfy even the original limit, don't ask for more
+                 if (!singleResultCounter.isDoneForPartition())
                      return null;
  
-                 // clustering of the last row returned is empty, meaning that 
there is only one row per partition,
-                 // and we already have it.
-                 if (lastClustering == Clustering.EMPTY)
+                 /*
+                  * If the replica has no live rows in the partition, don't 
try to fetch more.
+                  *
+                  * Note that the previous branch [if 
(!singleResultCounter.isDoneForPartition()) return null] doesn't
+                  * always cover this scenario:
+                  * isDoneForPartition() is defined as [isDone() || 
rowInCurrentPartition >= perPartitionLimit],
+                  * and will return true if isDone() returns true, even if 
there are 0 rows counted in the current partition.
+                  *
+                  * This can happen with a range read if after 1+ rounds of 
short read protection requests we managed to fetch
+                  * enough extra rows for other partitions to satisfy the 
singleResultCounter's total row limit, but only
+                  * have tombstones in the current partition.
+                  *
+                  * One other way we can hit this condition is when the 
partition only has a live static row and no regular
+                  * rows. In that scenario the counter will remain at 0 until 
the partition is closed - which happens after
+                  * the moreContents() call.
+                  */
 -                if (singleResultCounter.countedInCurrentPartition() == 0)
++                if (countedInCurrentPartition(singleResultCounter) == 0)
                      return null;
  
-                 lastCount = counted(counter);
- 
-                 // We need to try to query enough additional results to 
fulfill our query, but because we could still
-                 // get short reads on that additional query, just querying 
the number of results we miss may not be
-                 // enough. But we know that when this node answered n rows 
(counter.countedInCurrentPartition), only
-                 // x rows 
(postReconciliationCounter.countedInCurrentPartition()) made it in the final 
result.
-                 // So our ratio of live rows to requested rows is x/n, so 
since we miss n-x rows, we estimate that
-                 // we should request m rows so that m * x/n = n-x, that is m 
= (n^2/x) - n.
-                 // Also note that it's ok if we retrieve more results that 
necessary since our top level iterator is a
-                 // counting iterator.
-                 int n = countedInCurrentPartition(postReconciliationCounter);
-                 int x = countedInCurrentPartition(counter);
-                 int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
- 
-                 DataLimits retryLimits = 
command.limits().forShortReadRetry(toQuery);
-                 ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
-                 ClusteringIndexFilter retryFilter = lastClustering == null ? 
filter : filter.forPaging(metadata.comparator, lastClustering, false);
-                 SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
-                                                                               
     command.nowInSec(),
-                                                                               
     command.columnFilter(),
-                                                                               
     command.rowFilter(),
-                                                                               
     retryLimits,
-                                                                               
     partitionKey,
-                                                                               
     retryFilter);
- 
-                 Tracing.trace("Requesting {} extra rows from {} for short 
read protection", toQuery, source);
-                 
Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
- 
-                 return doShortReadRetry(cmd);
-             }
+                 /*
+                  * This is a table with no clustering columns, and has at 
most one row per partition - with EMPTY clustering.
+                  * We already have the row, so there is no point in asking 
for more from the partition.
+                  */
+                 if (Clustering.EMPTY == lastClustering)
+                     return null;
  
-             /**
-              * Returns the number of results counted by the counter.
-              *
-              * @param counter the counter.
-              * @return the number of results counted by the counter
-              */
-             private int counted(Counter counter)
-             {
-                 // We are interested by the number of rows but for GROUP BY 
queries 'counted' returns the number of
-                 // groups.
-                 if (command.limits().isGroupByLimit())
-                     return counter.rowCounted();
 -                lastFetched = singleResultCounter.countedInCurrentPartition() 
- lastCounted;
 -                lastCounted = singleResultCounter.countedInCurrentPartition();
++                lastFetched = countedInCurrentPartition(singleResultCounter) 
- lastCounted;
++                lastCounted = countedInCurrentPartition(singleResultCounter);
+ 
+                 // getting back fewer rows than we asked for means the 
partition on the replica has been fully consumed
+                 if (lastQueried > 0 && lastFetched < lastQueried)
+                     return null;
  
-                 return counter.counted();
+                 /*
+                  * At this point we know that:
+                  *     1. the replica returned [repeatedly?] as many rows as 
we asked for and potentially has more
+                  *        rows in the partition
+                  *     2. at least one of those returned rows was shadowed by 
a tombstone returned from another
+                  *        replica
+                  *     3. we haven't satisfied the client's limits yet, and 
should attempt to query for more rows to
+                  *        avoid a short read
+                  *
+                  * In the ideal scenario, we would get exactly min(a, b) or 
fewer rows from the next request, where a and b
+                  * are defined as follows:
+                  *     [a] limits.count() - mergedResultCounter.counted()
+                  *     [b] limits.perPartitionCount() - 
mergedResultCounter.countedInCurrentPartition()
+                  *
+                  * It would be naive to query for exactly that many rows, as 
it's possible and not unlikely
+                  * that some of the returned rows would also be shadowed by 
tombstones from other hosts.
+                  *
+                  * Note: we don't know, nor do we care, how many rows from 
the replica made it into the reconciled result;
+                  * we can only tell how many in total we queried for, and 
that [0, mrc.countedInCurrentPartition()) made it.
+                  *
+                  * In general, our goal should be to minimise the number of 
extra requests - *not* to minimise the number
+                  * of rows fetched: there is a high transactional cost for 
every individual request, but a relatively low
+                  * marginal cost for each extra row requested.
+                  *
+                  * As such it's better to overfetch than to underfetch extra 
rows from a host; but at the same
+                  * time we want to respect paging limits and not blow up 
spectacularly.
+                  *
+                  * Note: it's ok to retrieve more rows that necessary since 
singleResultCounter is not stopping and only
+                  * counts.
+                  *
+                  * With that in mind, we'll just request the minimum of 
(count(), perPartitionCount()) limits,
+                  * but no fewer than 8 rows (an arbitrary round lower bound), 
to ensure that we won't fetch row by row
+                  * for SELECT DISTINCT queries (that set per partition limit 
to 1).
+                  *
+                  * See CASSANDRA-13794 for more details.
+                  */
+                 lastQueried = Math.max(Math.min(command.limits().count(), 
command.limits().perPartitionCount()), 8);
+ 
+                 
ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
+                 Tracing.trace("Requesting {} extra rows from {} for short 
read protection", lastQueried, source);
+ 
+                 return 
executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
              }
  
-             /**
-              * Returns the number of results counted in the partition by the 
counter.
-              *
-              * @param counter the counter.
-              * @return the number of results counted in the partition by the 
counter
-              */
++            // Counts the number of rows for regular queries and the number 
of groups for GROUP BY queries
 +            private int countedInCurrentPartition(Counter counter)
 +            {
-                 // We are interested by the number of rows but for GROUP BY 
queries 'countedInCurrentPartition' returns
-                 // the number of groups in the current partition.
-                 if (command.limits().isGroupByLimit())
-                     return counter.rowCountedInCurrentPartition();
++                return command.limits().isGroupByLimit()
++                     ? counter.rowCountedInCurrentPartition()
++                     : counter.countedInCurrentPartition();
++            }
 +
-                 return counter.countedInCurrentPartition();
+             private SinglePartitionReadCommand 
makeFetchAdditionalRowsReadCommand(int toQuery)
+             {
+                 ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
+                 if (null != lastClustering)
+                     filter = filter.forPaging(metadata.comparator, 
lastClustering, false);
+ 
+                 return SinglePartitionReadCommand.create(command.metadata(),
+                                                          command.nowInSec(),
+                                                          
command.columnFilter(),
+                                                          command.rowFilter(),
+                                                          
command.limits().forShortReadRetry(toQuery),
+                                                          partitionKey,
+                                                          filter);
              }
  
-             private UnfilteredRowIterator 
doShortReadRetry(SinglePartitionReadCommand retryCommand)
+             private UnfilteredRowIterator 
executeReadCommand(SinglePartitionReadCommand cmd)
              {
-                 DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
-                 ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), 
queryStartNanoTime);
 -                DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1);
 -                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
++                DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1, queryStartNanoTime);
++                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source), 
queryStartNanoTime);
+ 
                  if (StorageProxy.canDoLocalRequest(source))
-                     
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
+                     
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
                  else
-                     
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
 source, handler);
+                     
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
 source, handler);
  
                  // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
                  handler.awaitResults();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to