This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new abdf508 Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10 abdf508 is described below commit abdf5085d4381351054bc2c0976bc826f4ac82e2 Author: Zhao Yang <zhaoyangsingap...@gmail.com> AuthorDate: Mon Jun 22 15:34:22 2020 +0100 Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10 patch by Zhao Yang; reviewed by Andres de la Peña, Caleb Rackliffe for CASSANDRA-15752 --- CHANGES.txt | 1 + .../org/apache/cassandra/service/StorageProxy.java | 115 +++++++++++++---- .../cassandra/db/PartitionRangeReadTest.java | 143 +++++++++++++++++++++ 3 files changed, 235 insertions(+), 24 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index d1b1416..dc50ff5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.21 + * Fixed range read concurrency factor computation and capped as 10 times tpc cores (CASSANDRA-15752) * Catch exception on bootstrap resume and init native transport (CASSANDRA-15863) * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273) * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805) diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 19cd901..c7888c4 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Predicate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.primitives.Ints; @@ -99,6 +100,15 @@ public class StorageProxy implements StorageProxyMBean private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10; + /** + * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously + * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large. + * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we + * don't want a burst of range requests that will back up, hurting all other queries. At the same time, + * we want to give range queries a chance to run if resources are available. + */ + private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10)); + private StorageProxy() { } @@ -1838,21 +1848,33 @@ public class StorageProxy implements StorageProxyMBean return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); } - private static class RangeForQuery + @VisibleForTesting + public static class RangeForQuery { public final AbstractBounds<PartitionPosition> range; public final List<InetAddress> liveEndpoints; public final List<InetAddress> filteredEndpoints; + public final int vnodeCount; - public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints) + public RangeForQuery(AbstractBounds<PartitionPosition> range, + List<InetAddress> liveEndpoints, + List<InetAddress> filteredEndpoints, + int vnodeCount) { this.range = range; this.liveEndpoints = liveEndpoints; this.filteredEndpoints = filteredEndpoints; + this.vnodeCount = vnodeCount; + } + + public int vnodeCount() + { + return vnodeCount; } } - private static class RangeIterator extends AbstractIterator<RangeForQuery> + @VisibleForTesting + public static class RangeIterator extends AbstractIterator<RangeForQuery> { private final Keyspace keyspace; private final ConsistencyLevel consistency; @@ -1885,17 +1907,19 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right); return new RangeForQuery(range, liveEndpoints, - consistency.filterForQuery(keyspace, liveEndpoints)); + consistency.filterForQuery(keyspace, liveEndpoints), + 1); } } - private static class RangeMerger extends AbstractIterator<RangeForQuery> + @VisibleForTesting + public static class RangeMerger extends AbstractIterator<RangeForQuery> { private final Keyspace keyspace; private final ConsistencyLevel consistency; private final PeekingIterator<RangeForQuery> ranges; - private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency) + public RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency) { this.keyspace = keyspace; this.consistency = consistency; @@ -1937,7 +1961,8 @@ public class StorageProxy implements StorageProxyMBean break; // If we get there, merge this range and the next one - current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged); + int vnodeCount = current.vnodeCount + next.vnodeCount; + current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged, vnodeCount); ranges.next(); // consume the range we just merged since we've only peeked so far } return current; @@ -1982,7 +2007,7 @@ public class StorageProxy implements StorageProxyMBean } } - private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator + public static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator { private final Iterator<RangeForQuery> ranges; private final int totalRangeCount; @@ -1995,19 +2020,28 @@ public class StorageProxy implements StorageProxyMBean private DataLimits.Counter counter; private PartitionIterator sentQueryIterator; + private final int maxConcurrencyFactor; private int concurrencyFactor; // The two following "metric" are maintained to improve the concurrencyFactor // when it was not good enough initially. private int liveReturned; private int rangesQueried; + private int batchesRequested = 0; - public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + public RangeCommandIterator(Iterator<RangeForQuery> ranges, + PartitionRangeReadCommand command, + int concurrencyFactor, + int maxConcurrencyFactor, + int totalRangeCount, + Keyspace keyspace, + ConsistencyLevel consistency) { this.command = command; this.concurrencyFactor = concurrencyFactor; + this.maxConcurrencyFactor = maxConcurrencyFactor; this.startTime = System.nanoTime(); - this.ranges = new RangeMerger(ranges, keyspace, consistency); - this.totalRangeCount = ranges.rangeCount(); + this.ranges = ranges; + this.totalRangeCount = totalRangeCount; this.consistency = consistency; this.keyspace = keyspace; this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); @@ -2026,7 +2060,6 @@ public class StorageProxy implements StorageProxyMBean // else, sends the next batch of concurrent queries (after having close the previous iterator) if (sentQueryIterator != null) { - liveReturned += counter.counted(); sentQueryIterator.close(); // It's not the first batch of queries and we're not done, so we we can use what has been @@ -2057,20 +2090,31 @@ public class StorageProxy implements StorageProxyMBean private void updateConcurrencyFactor() { + liveReturned += counter.counted(); + + concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned); + } + + @VisibleForTesting + public static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned) + { + maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried)); if (liveReturned == 0) { - // we haven't actually gotten any results, so query all remaining ranges at once - concurrencyFactor = totalRangeCount - rangesQueried; - return; + // we haven't actually gotten any results, so query up to the limit if not results so far + Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor); + return maxConcurrencyFactor; } // Otherwise, compute how many rows per range we got on average and pick a concurrency factor // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries. - int remainingRows = command.limits().count() - liveReturned; + int remainingRows = limit - liveReturned; float rowsPerRange = (float)liveReturned / (float)rangesQueried; - concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange))); + int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange))); logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - rowsPerRange, (int) remainingRows, concurrencyFactor); + rowsPerRange, remainingRows, concurrencyFactor); + + return concurrencyFactor; } private SingleRangeResponse query(RangeForQuery toQuery) @@ -2106,11 +2150,14 @@ public class StorageProxy implements StorageProxyMBean private PartitionIterator sendNextRequests() { List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); - for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) + for (int i = 0; i < concurrencyFactor && ranges.hasNext();) { - concurrentQueries.add(query(ranges.next())); - ++rangesQueried; + RangeForQuery range = ranges.next(); + concurrentQueries.add(query(range)); + rangesQueried += range.vnodeCount(); + i += range.vnodeCount(); } + batchesRequested++; Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to @@ -2133,6 +2180,18 @@ public class StorageProxy implements StorageProxyMBean Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); } } + + @VisibleForTesting + public int rangesQueried() + { + return rangesQueried; + } + + @VisibleForTesting + public int batchesRequested() + { + return batchesRequested; + } } @SuppressWarnings("resource") @@ -2148,16 +2207,24 @@ public class StorageProxy implements StorageProxyMBean // underestimate how many rows we will get per-range in order to increase the likelihood that we'll // fetch enough rows in the first round resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + int maxConcurrencyFactor = Math.min(ranges.rangeCount(), MAX_CONCURRENT_RANGE_REQUESTS); int concurrencyFactor = resultsPerRange == 0.0 ? 1 - : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange))); + : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange))); logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor); Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange); // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. - - return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), + RangeMerger mergedRanges = new RangeMerger(ranges, keyspace, consistencyLevel); + RangeCommandIterator rangeCommandIterator = new RangeCommandIterator(mergedRanges, + command, + concurrencyFactor, + maxConcurrencyFactor, + ranges.rangeCount(), + keyspace, + consistencyLevel); + return command.limits().filter(command.postReconciliationProcessing(rangeCommandIterator), command.nowInSec(), command.selectsFullPartition(), command.metadata().enforceStrictLiveness()); diff --git a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java index 1368705..b567f72 100644 --- a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java +++ b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java @@ -19,26 +19,41 @@ package org.apache.cassandra.db; import java.math.BigInteger; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import com.google.common.collect.Iterators; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.cassandra.db.ConsistencyLevel.ONE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.apache.cassandra.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; public class PartitionRangeReadTest { @@ -48,6 +63,8 @@ public class PartitionRangeReadTest public static final String CF_STANDARDINT = "StandardInteger1"; public static final String CF_COMPACT1 = "Compact1"; + private static final List<InetAddress> LOCAL = Collections.singletonList(FBUtilities.getBroadcastAddress()); + @BeforeClass public static void defineSchema() throws ConfigurationException { @@ -388,5 +405,131 @@ public class PartitionRangeReadTest // assertColumnNames(row1, "c1", "c2"); // assertColumnNames(row2, "c1"); // } + + @Test + public void testComputeConcurrencyFactor() + { + int maxConcurrentRangeRequest = 32; + + // no live row returned, fetch all remaining ranges but hit the max instead + int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0); + assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest + + // no live row returned, fetch all remaining ranges + cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0); + assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest + + // returned half rows, fetch rangesQueried again but hit the max instead + cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240); + assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest + + // returned half rows, fetch rangesQueried again + cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240); + assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest + + // returned most of rows, 1 more range to fetch + cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479); + assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest + } + + @Test + public void testRangeCountWithRangeMerge() + { + List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400)); + int vnodeCount = 0; + + Keyspace keyspace = Keyspace.open(KEYSPACE1); + List<StorageProxy.RangeForQuery> ranges = new ArrayList<>(); + for (int i = 0; i + 1 < tokens.size(); i++) + { + Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1)); + ranges.add(new StorageProxy.RangeForQuery(range, LOCAL, LOCAL, 1)); + vnodeCount++; + } + + StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ONE); + StorageProxy.RangeForQuery mergedRange = Iterators.getOnlyElement(merge); + // all ranges are merged as test has only one node. + assertEquals(vnodeCount, mergedRange.vnodeCount()); + } + + @Test + public void testRangeQueried() + { + List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400)); + int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges + + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.clearUnsafe(); + + int rows = 100; + for (int i = 0; i < rows; ++i) + { + RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 10, String.valueOf(i)); + builder.clustering("c"); + builder.add("val", String.valueOf(i)); + builder.build().applyUnsafe(); + } + cfs.forceBlockingFlush(); + + PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build(); + + // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges + Iterator<StorageProxy.RangeForQuery> ranges = rangeIterator(command, keyspace, false); + StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE); + verifyRangeCommandIterator(data, rows, 2, vnodeCount); + + // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch + ranges = rangeIterator(command, keyspace, false); + data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, keyspace, ONE); + verifyRangeCommandIterator(data, rows, 1, vnodeCount); + + // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch + ranges = rangeIterator(command, keyspace, false); + data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE); + verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount); + + // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost + ranges = rangeIterator(command, keyspace, true); + data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE); + verifyRangeCommandIterator(data, rows, 1, vnodeCount); + + // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost + ranges = rangeIterator(command, keyspace, true); + data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE); + verifyRangeCommandIterator(data, rows, 1, vnodeCount); + } + + private Iterator<StorageProxy.RangeForQuery> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger) + { + Iterator<StorageProxy.RangeForQuery> ranges = new StorageProxy.RangeIterator(command, keyspace, ONE); + if (withRangeMerger) + ranges = new StorageProxy.RangeMerger(ranges, keyspace, ONE); + + return ranges; + } + + private void verifyRangeCommandIterator(StorageProxy.RangeCommandIterator data, int rows, int batches, int vnodeCount) + { + int num = Util.size(data); + assertEquals(rows, num); + assertEquals(batches, data.batchesRequested()); + assertEquals(vnodeCount, data.rangesQueried()); + } + + private List<Token> setTokens(List<Integer> values) + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + List<Token> tokens = new ArrayList<>(values.size()); + for (Integer val : values) + tokens.add(partitioner.getToken(ByteBufferUtil.bytes(val))); + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); + + return tokens; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org