Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99e0c907 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99e0c907 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99e0c907 Branch: refs/heads/cassandra-3.0 Commit: 99e0c907eabc26f876f984daf33fdc2d3ab66a24 Parents: 507ed14 3aa7308 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Aug 14 21:27:08 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Aug 14 21:27:08 2015 +0300 ---------------------------------------------------------------------- .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../cassandra/db/SizeEstimatesRecorder.java | 3 ++- .../org/apache/cassandra/db/lifecycle/View.java | 20 ++++++++++++---- .../apache/cassandra/dht/AbstractBounds.java | 25 ++++++++++++++++++++ src/java/org/apache/cassandra/dht/Bounds.java | 2 +- .../apache/cassandra/dht/ExcludingBounds.java | 2 +- .../cassandra/dht/IncludingExcludingBounds.java | 2 +- .../cassandra/streaming/StreamSession.java | 16 +++++++++---- .../apache/cassandra/db/lifecycle/ViewTest.java | 8 ++++--- 9 files changed, 63 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 797e2c7,8bda6b2..8d72ecf --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1179,7 -1394,7 +1179,7 @@@ public class ColumnFamilyStore implemen Set<SSTableReader> results = null; for (SSTableReader sstable : sstables) { - Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, AbstractBounds.bounds(sstable.first, true, sstable.last, true))); - Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last))); ++ Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, sstable.first, sstable.last)); results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy(); } results = Sets.difference(results, ImmutableSet.copyOf(sstables)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java index 75590fa,73ba131..7ee0fdf --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@@ -176,41 -126,19 +176,53 @@@ public class Vie return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting); } - public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds) + /** - * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively. - * The interval formed by {@code left} and {@code right} shouldn't wrap. - */ - public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right) ++ * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively. ++ * The interval formed by {@code left} and {@code right} shouldn't wrap. ++ */ ++ public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, PartitionPosition left, PartitionPosition right) { + assert !AbstractBounds.strictlyWrapsAround(left, right); + if (intervalTree.isEmpty()) return Collections.emptyList(); - PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right; - return select(sstableSet, intervalTree.search(Interval.create(rowBounds.left, stopInTree))); + - RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; - return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree)); ++ PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; ++ return select(sstableSet, intervalTree.search(Interval.create(left, stopInTree))); + } + + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet) + { + return (view) -> view.sstables(sstableSet); + } + + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter) + { + return (view) -> view.sstables(sstableSet, filter); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for the given @param key, according to the interval tree + */ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key) + { + assert sstableSet == SSTableSet.LIVE; + return (view) -> view.intervalTree.search(key); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for rows within @param rowBounds, inclusive, according to the interval tree. + */ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds) + { - return (view) -> view.sstablesInBounds(sstableSet, rowBounds); ++ // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however ++ // because the fact we restrict the sstables returned by this function is an optimization in the first ++ // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also ++ // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively ++ // instead of exclusively, so the performance impact is negligible in practice. ++ return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, rowBounds.right); } // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW: http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/AbstractBounds.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/Bounds.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/ExcludingBounds.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 861528b,55d7e68..bb5be1e --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -35,8 -35,7 +35,7 @@@ import org.slf4j.LoggerFactory import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; - import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; @@@ -320,23 -313,40 +319,30 @@@ public class StreamSession implements I { for (ColumnFamilyStore cfStore : stores) { - final List<AbstractBounds<PartitionPosition>> rowBoundsList = new ArrayList<>(ranges.size()); - final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size()); ++ final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) - rowBoundsList.add(Range.makeRowRange(range)); + keyRanges.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) + refs.addAll(cfStore.selectAndReference(view -> { + Set<SSTableReader> sstables = Sets.newHashSet(); - for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList) ++ for (Range<PartitionPosition> keyRange : keyRanges) { - for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, rowBounds)) - Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>(); - for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)) - permittedInstances.put(reader, reader); - - Set<SSTableReader> sstables = Sets.newHashSet(); - for (Range<RowPosition> keyRange : keyRanges) ++ // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). ++ // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). ++ // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that ++ // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even ++ // including keyRange.left will still exclude any key having the token of the original token range, and so we're ++ // still actually selecting what we wanted. ++ for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, keyRange.left, keyRange.right)) { - // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). - // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). - // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that - // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even - // including keyRange.left will still exclude any key having the token of the original token range, and so we're - // still actually selecting what we wanted. - for (SSTableReader sstable : view.sstablesInBounds(keyRange.left, keyRange.right)) - { - // sstableInBounds may contain early opened sstables - if (isIncremental && sstable.isRepaired()) - continue; - sstable = permittedInstances.get(sstable); - if (sstable != null) - sstables.add(sstable); - } ++ // sstableInBounds may contain early opened sstables + if (!isIncremental || !sstable.isRepaired()) + sstables.add(sstable); } - - logger.debug("ViewFilter for {}/{} sstables", sstables.size(), view.sstables.size()); - return ImmutableList.copyOf(sstables); } + + if (logger.isDebugEnabled()) + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL))); + return sstables; }).refs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java index 27d426a,32a81e2..40afa54 --- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java @@@ -61,15 -59,16 +61,17 @@@ public class ViewTes { for (int j = i ; j < 5 ; j++) { - RowPosition min = MockSchema.readerBounds(i); - RowPosition max = MockSchema.readerBounds(j); + PartitionPosition min = MockSchema.readerBounds(i); + PartitionPosition max = MockSchema.readerBounds(j); - for (boolean minInc : new boolean[] { true, false} ) + for (boolean minInc : new boolean[] { true })//, false} ) { - for (boolean maxInc : new boolean[] { true, false} ) + for (boolean maxInc : new boolean[] { true })//, false} ) { if (i == j && !(minInc && maxInc)) continue; - List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE, AbstractBounds.bounds(min, minInc, max, maxInc))); - AbstractBounds<RowPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc); - List<SSTableReader> r = initialView.sstablesInBounds(bounds.left, bounds.right); ++ ++ AbstractBounds<PartitionPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc); ++ List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE,bounds.left, bounds.right)); Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size()); } }