Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73a8341f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73a8341f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73a8341f Branch: refs/heads/cassandra-3.0 Commit: 73a8341fef25de7236bc591e84cddc637c0b7b2f Parents: 3d211e9 05f8a00 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Jun 13 15:14:28 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Jun 13 15:14:28 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 4 ++ .../org/apache/cassandra/db/lifecycle/View.java | 11 +++ .../cassandra/streaming/StreamSession.java | 18 +++-- .../io/sstable/SSTableRewriterTest.java | 75 +++++++++++++++++--- 5 files changed, 88 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 47aef7e,491f72a..8a04077 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,28 -1,5 +1,29 @@@ -2.2.7 +3.0.8 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666) +Merged from 2.2: * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) +Merged from 2.1: ++ * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) + + +3.0.7 + * Fix legacy serialization of Thrift-generated non-compound range tombstones + when communicating with 2.x nodes (CASSANDRA-11930) + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849) + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912) + * Fix sstables not being protected from removal during index build (CASSANDRA-11905) + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032) + * Remove unneeded code to repair index summaries that have + been improperly down-sampled (CASSANDRA-11127) + * Avoid WriteTimeoutExceptions during commit log replay due to materialized + view lock contention (CASSANDRA-11891) + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530) + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java index 17062b4,e303801..99903fc --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@@ -179,52 -131,23 +179,63 @@@ public class Vie } /** - * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively. - * The interval formed by {@code left} and {@code right} shouldn't wrap. - */ - public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right) + * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively. + * The interval formed by {@code left} and {@code right} shouldn't wrap. + */ + public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, PartitionPosition left, PartitionPosition right) { - return sstablesInBounds(left, right, intervalTree); + assert !AbstractBounds.strictlyWrapsAround(left, right); + + if (intervalTree.isEmpty()) + return Collections.emptyList(); + + PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; + return select(sstableSet, intervalTree.search(Interval.create(left, stopInTree))); } - public static List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right, SSTableIntervalTree intervalTree) ++ public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree) + { + assert !AbstractBounds.strictlyWrapsAround(left, right); + + if (intervalTree.isEmpty()) + return Collections.emptyList(); + - RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; - return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree)); ++ PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; ++ return intervalTree.search(Interval.create(left, stopInTree)); ++ } ++ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet) + { + return (view) -> view.sstables(sstableSet); + } + + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter) + { + return (view) -> view.sstables(sstableSet, filter); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for the given @param key, according to the interval tree + */ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key) + { + assert sstableSet == SSTableSet.LIVE; + return (view) -> view.intervalTree.search(key); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for rows within @param rowBounds, inclusive, according to the interval tree. + */ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds) + { + // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however + // because the fact we restrict the sstables returned by this function is an optimization in the first + // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also + // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively + // instead of exclusively, so the performance impact is negligible in practice. + return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, rowBounds.right); } // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW: http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 2ed6ad1,f4c900e..d5c060e --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -25,10 -25,12 +25,13 @@@ import java.util.* import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.collect.*; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; + import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; + import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -320,30 -319,33 +324,24 @@@ public class StreamSession implements I { for (ColumnFamilyStore cfStore : stores) { - final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size()); + final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) keyRanges.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) + refs.addAll(cfStore.selectAndReference(view -> { + Set<SSTableReader> sstables = Sets.newHashSet(); ++ SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.sstables(SSTableSet.CANONICAL)); + for (Range<PartitionPosition> keyRange : keyRanges) { - // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). - // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). - // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that - // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even - // including keyRange.left will still exclude any key having the token of the original token range, and so we're - // still actually selecting what we wanted. - for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, keyRange.left, keyRange.right)) - SSTableIntervalTree intervalTree = SSTableIntervalTree.build(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)); - Set<SSTableReader> sstables = Sets.newHashSet(); - for (Range<RowPosition> keyRange : keyRanges) ++ for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree)) { - // sstableInBounds may contain early opened sstables - // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). - // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). - // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that - // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even - // including keyRange.left will still exclude any key having the token of the original token range, and so we're - // still actually selecting what we wanted. - for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree)) - { - if (!isIncremental || !sstable.isRepaired()) - sstables.add(sstable); - } + if (!isIncremental || !sstable.isRepaired()) + sstables.add(sstable); } - - logger.debug("ViewFilter for {}/{} sstables", sstables.size(), view.sstables.size()); - return ImmutableList.copyOf(sstables); } + + if (logger.isDebugEnabled()) + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL))); + return sstables; }).refs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 008df06,f50953a..18bc760 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@@ -20,48 -19,49 +20,42 @@@ package org.apache.cassandra.io.sstable import java.io.File; import java.io.IOException; --import java.nio.ByteBuffer; import java.util.*; - import java.util.concurrent.ThreadLocalRandom; + import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; --import org.junit.After; --import org.junit.AfterClass; --import org.junit.BeforeClass; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; --import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.config.Config; +import org.apache.cassandra.UpdateBuilder; - import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.SerializationHeader; ++import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.LazilyCompactedRow; +import org.apache.cassandra.db.compaction.CompactionIterator; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.db.compaction.SSTableSplitter; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.db.lifecycle.SSTableSet; - import org.apache.cassandra.db.lifecycle.View; - import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; - import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.metrics.StorageMetrics; - import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.notifications.INotification; -import org.apache.cassandra.notifications.INotificationConsumer; -import org.apache.cassandra.notifications.SSTableListChangedNotification; -import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair;