Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/030ec1f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/030ec1f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/030ec1f0

Branch: refs/heads/trunk
Commit: 030ec1f056d0e0b9094ddf7fcd2a491cb8ddf621
Parents: 4809f42 2bae4ca
Author: Aleksey Yeschenko <alek...@yeschenko.com>
Authored: Wed Sep 20 17:47:32 2017 +0100
Committer: Aleksey Yeschenko <alek...@yeschenko.com>
Committed: Wed Sep 20 17:47:32 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +
 .../apache/cassandra/service/DataResolver.java  | 303 +++++++++++--------
 3 files changed, 187 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/030ec1f0/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/030ec1f0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5aecc9d,548de88..72a63f0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2654,10 -2635,15 +2654,15 @@@ public class ColumnFamilyStore implemen
          if (keyspace == null)
              return null;
  
 -        UUID id = Schema.instance.getId(ksName, cfName);
 -        if (id == null)
 +        TableMetadata table = Schema.instance.getTableMetadata(ksName, 
cfName);
 +        if (table == null)
              return null;
  
 -        return keyspace.getColumnFamilyStore(id);
 +        return keyspace.getColumnFamilyStore(table.id);
      }
+ 
 -    public static TableMetrics metricsFor(UUID tableId)
++    public static TableMetrics metricsFor(TableId tableId)
+     {
+         return getIfExists(tableId).metric;
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/030ec1f0/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 98e3285,32fc015..b0741da
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,10 -27,7 +27,9 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
 -import org.apache.cassandra.config.*;
 +import org.apache.cassandra.schema.ColumnMetadata;
- import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.filter.DataLimits.Counter;
@@@ -88,26 -99,22 +101,19 @@@ public class DataResolver extends Respo
           * See CASSANDRA-13747 for more details.
           */
  
-         DataLimits.Counter counter = 
command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition());
+         DataLimits.Counter mergedResultCounter =
+             command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition());
  
-         UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, counter);
-         FilteredPartitions filtered = FilteredPartitions.filter(merged, new 
Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
-         PartitionIterator counted = counter.applyTo(filtered);
+         UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+         FilteredPartitions filtered =
+             FilteredPartitions.filter(merged, new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness()));
+         PartitionIterator counted = Transformation.apply(filtered, 
mergedResultCounter);
 -
 -        return command.isForThrift()
 -             ? counted
 -             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
 +        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
      }
  
-     public void compareResponses()
-     {
-         // We need to fully consume the results to trigger read repairs if 
appropriate
-         try (PartitionIterator iterator = resolve())
-         {
-             PartitionIterators.consume(iterator);
-         }
-     }
- 
      private UnfilteredPartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
                                                                       
InetAddress[] sources,
-                                                                      
DataLimits.Counter resultCounter)
+                                                                      
DataLimits.Counter mergedResultCounter)
      {
          // If we have only one results, there is no read repair to do and we 
can't get short reads
          if (results.size() == 1)
@@@ -209,7 -227,7 +226,7 @@@
              // For each source, record if there is an open range to send as 
repair, and from where.
              private final ClusteringBound[] markerToRepair = new 
ClusteringBound[sources.length];
  
-             public MergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed)
 -            private MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
++            private MergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed)
              {
                  this.partitionKey = partitionKey;
                  this.columns = columns;
@@@ -503,12 -523,16 +522,16 @@@
  
          private class ShortReadRowProtection extends Transformation 
implements MoreRows<UnfilteredRowIterator>
          {
-             final TableMetadata metadata;
-             final DecoratedKey partitionKey;
-             Clustering lastClustering;
-             int lastCount = 0;
 -            private final CFMetaData metadata;
++            private final TableMetadata metadata;
+             private final DecoratedKey partitionKey;
+ 
+             private Clustering lastClustering;
+ 
+             private int lastCounted = 0; // last seen recorded # before 
attempting to fetch more rows
+             private int lastFetched = 0; // # rows returned by last attempt 
to get more (or by the original read command)
+             private int lastQueried = 0; // # extra rows requested from the 
replica last time
  
 -            private ShortReadRowProtection(CFMetaData metadata, DecoratedKey 
partitionKey)
 +            private ShortReadRowProtection(TableMetadata metadata, 
DecoratedKey partitionKey)
              {
                  this.metadata = metadata;
                  this.partitionKey = partitionKey;
@@@ -521,100 -545,141 +544,141 @@@
                  return row;
              }
  
-             @Override
+             /*
+              * We have a potential short read if the result from a given node 
contains the requested number of rows
+              * for that partition (i.e. it has stopped returning results due 
to the limit), but some of them haven't
+              * made it into the final post-reconciliation result due to other 
nodes' tombstones.
+              *
+              * If that is the case, then that node may have more rows that we 
should fetch, as otherwise we could
+              * ultimately return fewer rows than required. Also, those 
additional rows may contain tombstones which
+              * which we also need to fetch as they may shadow rows from other 
replicas' results, which we would
+              * otherwise return incorrectly.
+              *
+              * Also note that we only get here once all the rows for this 
partition have been iterated over, and so
+              * if the node had returned the requested number of rows but we 
still get here, then some results were
+              * skipped during reconciliation.
+              */
              public UnfilteredRowIterator moreContents()
              {
-                 assert !postReconciliationCounter.isDoneForPartition();
- 
-                 // We have a short read if the node this is the result of has 
returned the requested number of
-                 // rows for that partition (i.e. it has stopped returning 
results due to the limit), but some of
-                 // those results haven't made it in the final result 
post-reconciliation due to other nodes
-                 // tombstones. If that is the case, then the node might have 
more results that we should fetch
-                 // as otherwise we might return less results than required, 
or results that shouldn't be returned
-                 // (because the node has tombstone that hides future results 
from other nodes but that haven't
-                 // been returned due to the limit).
-                 // Also note that we only get here once all the results for 
this node have been returned, and so
-                 // if the node had returned the requested number but we still 
get there, it imply some results were
-                 // skipped during reconciliation.
-                 if (lastCount == counted(counter) || 
!counter.isDoneForPartition())
+                 // never try to request additional rows from replicas if our 
reconciled partition is already filled to the limit
+                 assert !mergedResultCounter.isDoneForPartition();
+ 
+                 // we do not apply short read protection when we have no 
limits at all
+                 assert !command.limits().isUnlimited();
+ 
+                 // if the returned partition doesn't have enough rows to 
satisfy even the original limit, don't ask for more
+                 if (!singleResultCounter.isDoneForPartition())
                      return null;
  
-                 // clustering of the last row returned is empty, meaning that 
there is only one row per partition,
-                 // and we already have it.
-                 if (lastClustering == Clustering.EMPTY)
+                 /*
+                  * If the replica has no live rows in the partition, don't 
try to fetch more.
+                  *
+                  * Note that the previous branch [if 
(!singleResultCounter.isDoneForPartition()) return null] doesn't
+                  * always cover this scenario:
+                  * isDoneForPartition() is defined as [isDone() || 
rowInCurrentPartition >= perPartitionLimit],
+                  * and will return true if isDone() returns true, even if 
there are 0 rows counted in the current partition.
+                  *
+                  * This can happen with a range read if after 1+ rounds of 
short read protection requests we managed to fetch
+                  * enough extra rows for other partitions to satisfy the 
singleResultCounter's total row limit, but only
+                  * have tombstones in the current partition.
+                  *
+                  * One other way we can hit this condition is when the 
partition only has a live static row and no regular
+                  * rows. In that scenario the counter will remain at 0 until 
the partition is closed - which happens after
+                  * the moreContents() call.
+                  */
+                 if (countedInCurrentPartition(singleResultCounter) == 0)
                      return null;
  
-                 lastCount = counted(counter);
- 
-                 // We need to try to query enough additional results to 
fulfill our query, but because we could still
-                 // get short reads on that additional query, just querying 
the number of results we miss may not be
-                 // enough. But we know that when this node answered n rows 
(counter.countedInCurrentPartition), only
-                 // x rows 
(postReconciliationCounter.countedInCurrentPartition()) made it in the final 
result.
-                 // So our ratio of live rows to requested rows is x/n, so 
since we miss n-x rows, we estimate that
-                 // we should request m rows so that m * x/n = n-x, that is m 
= (n^2/x) - n.
-                 // Also note that it's ok if we retrieve more results that 
necessary since our top level iterator is a
-                 // counting iterator.
-                 int n = countedInCurrentPartition(postReconciliationCounter);
-                 int x = countedInCurrentPartition(counter);
-                 int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
- 
-                 DataLimits retryLimits = 
command.limits().forShortReadRetry(toQuery);
-                 ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
-                 ClusteringIndexFilter retryFilter = lastClustering == null ? 
filter : filter.forPaging(metadata.comparator, lastClustering, false);
-                 SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
-                                                                               
     command.nowInSec(),
-                                                                               
     command.columnFilter(),
-                                                                               
     command.rowFilter(),
-                                                                               
     retryLimits,
-                                                                               
     partitionKey,
-                                                                               
     retryFilter);
- 
-                 Tracing.trace("Requesting {} extra rows from {} for short 
read protection", toQuery, source);
-                 
Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().id).metric.shortReadProtectionRequests.mark();
- 
-                 return doShortReadRetry(cmd);
-             }
+                 /*
+                  * This is a table with no clustering columns, and has at 
most one row per partition - with EMPTY clustering.
+                  * We already have the row, so there is no point in asking 
for more from the partition.
+                  */
+                 if (Clustering.EMPTY == lastClustering)
+                     return null;
  
-             /**
-              * Returns the number of results counted by the counter.
-              *
-              * @param counter the counter.
-              * @return the number of results counted by the counter
-              */
-             private int counted(Counter counter)
-             {
-                 // We are interested by the number of rows but for GROUP BY 
queries 'counted' returns the number of
-                 // groups.
-                 if (command.limits().isGroupByLimit())
-                     return counter.rowCounted();
+                 lastFetched = countedInCurrentPartition(singleResultCounter) 
- lastCounted;
+                 lastCounted = countedInCurrentPartition(singleResultCounter);
+ 
+                 // getting back fewer rows than we asked for means the 
partition on the replica has been fully consumed
+                 if (lastQueried > 0 && lastFetched < lastQueried)
+                     return null;
  
-                 return counter.counted();
+                 /*
+                  * At this point we know that:
+                  *     1. the replica returned [repeatedly?] as many rows as 
we asked for and potentially has more
+                  *        rows in the partition
+                  *     2. at least one of those returned rows was shadowed by 
a tombstone returned from another
+                  *        replica
+                  *     3. we haven't satisfied the client's limits yet, and 
should attempt to query for more rows to
+                  *        avoid a short read
+                  *
+                  * In the ideal scenario, we would get exactly min(a, b) or 
fewer rows from the next request, where a and b
+                  * are defined as follows:
+                  *     [a] limits.count() - mergedResultCounter.counted()
+                  *     [b] limits.perPartitionCount() - 
mergedResultCounter.countedInCurrentPartition()
+                  *
+                  * It would be naive to query for exactly that many rows, as 
it's possible and not unlikely
+                  * that some of the returned rows would also be shadowed by 
tombstones from other hosts.
+                  *
+                  * Note: we don't know, nor do we care, how many rows from 
the replica made it into the reconciled result;
+                  * we can only tell how many in total we queried for, and 
that [0, mrc.countedInCurrentPartition()) made it.
+                  *
+                  * In general, our goal should be to minimise the number of 
extra requests - *not* to minimise the number
+                  * of rows fetched: there is a high transactional cost for 
every individual request, but a relatively low
+                  * marginal cost for each extra row requested.
+                  *
+                  * As such it's better to overfetch than to underfetch extra 
rows from a host; but at the same
+                  * time we want to respect paging limits and not blow up 
spectacularly.
+                  *
+                  * Note: it's ok to retrieve more rows that necessary since 
singleResultCounter is not stopping and only
+                  * counts.
+                  *
+                  * With that in mind, we'll just request the minimum of 
(count(), perPartitionCount()) limits,
+                  * but no fewer than 8 rows (an arbitrary round lower bound), 
to ensure that we won't fetch row by row
+                  * for SELECT DISTINCT queries (that set per partition limit 
to 1).
+                  *
+                  * See CASSANDRA-13794 for more details.
+                  */
+                 lastQueried = Math.max(Math.min(command.limits().count(), 
command.limits().perPartitionCount()), 8);
+ 
 -                
ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
++                
ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
+                 Tracing.trace("Requesting {} extra rows from {} for short 
read protection", lastQueried, source);
+ 
+                 return 
executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
              }
  
-             /**
-              * Returns the number of results counted in the partition by the 
counter.
-              *
-              * @param counter the counter.
-              * @return the number of results counted in the partition by the 
counter
-              */
+             // Counts the number of rows for regular queries and the number 
of groups for GROUP BY queries
              private int countedInCurrentPartition(Counter counter)
              {
-                 // We are interested by the number of rows but for GROUP BY 
queries 'countedInCurrentPartition' returns
-                 // the number of groups in the current partition.
-                 if (command.limits().isGroupByLimit())
-                     return counter.rowCountedInCurrentPartition();
+                 return command.limits().isGroupByLimit()
+                      ? counter.rowCountedInCurrentPartition()
+                      : counter.countedInCurrentPartition();
+             }
  
-                 return counter.countedInCurrentPartition();
+             private SinglePartitionReadCommand 
makeFetchAdditionalRowsReadCommand(int toQuery)
+             {
+                 ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
+                 if (null != lastClustering)
+                     filter = filter.forPaging(metadata.comparator, 
lastClustering, false);
+ 
+                 return SinglePartitionReadCommand.create(command.metadata(),
+                                                          command.nowInSec(),
+                                                          
command.columnFilter(),
+                                                          command.rowFilter(),
+                                                          
command.limits().forShortReadRetry(toQuery),
+                                                          partitionKey,
+                                                          filter);
              }
  
-             private UnfilteredRowIterator 
doShortReadRetry(SinglePartitionReadCommand retryCommand)
+             private UnfilteredRowIterator 
executeReadCommand(SinglePartitionReadCommand cmd)
              {
-                 DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
-                 ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), 
queryStartNanoTime);
+                 DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1, queryStartNanoTime);
+                 ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source), 
queryStartNanoTime);
+ 
                  if (StorageProxy.canDoLocalRequest(source))
-                     
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
+                     
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
                  else
-                     
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), 
source, handler);
 -                    
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
 source, handler);
++                    
MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, 
handler);
  
                  // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
                  handler.awaitResults();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to