Create interval tree over canonical sstables to avoid missing sstables during streaming
patch by marcuse; reviewed by benedict for CASSANDRA-11886 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/72acbcd0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/72acbcd0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/72acbcd0 Branch: refs/heads/cassandra-2.2 Commit: 72acbcd00fe7c46e54cd267f42868531e99e39df Parents: 68319f7 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed May 25 08:38:14 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Jun 13 14:31:47 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 4 ++ .../org/apache/cassandra/db/DataTracker.java | 8 ++- .../cassandra/streaming/StreamSession.java | 21 +++--- .../io/sstable/SSTableRewriterTest.java | 72 ++++++++++++++++++++ 5 files changed, 93 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index af641e1..ebcc90c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables (CASSANDRA-11055) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 166ce7e..559ba0b 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1531,6 +1531,10 @@ public class DatabaseDescriptor { return conf.sstable_preemptive_open_interval_in_mb; } + public static void setSSTablePreempiveOpenIntervalInMB(int mb) + { + conf.sstable_preemptive_open_interval_in_mb = mb; + } public static boolean getTrickleFsync() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index c731a35..927e717 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; @@ -810,9 +811,14 @@ public class DataTracker public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds) { + return sstablesInBounds(rowBounds, intervalTree, liveMemtables.get(0).cfs.partitioner); + } + + public static List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds, SSTableIntervalTree intervalTree, IPartitioner partitioner) + { if (intervalTree.isEmpty()) return Collections.emptyList(); - RowPosition stopInTree = rowBounds.right.isMinimum(liveMemtables.get(0).cfs.partitioner) ? intervalTree.max() : rowBounds.right; + RowPosition stopInTree = rowBounds.right.isMinimum(partitioner) ? intervalTree.max() : rowBounds.right; return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 4eb8557..273631c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.*; import org.slf4j.Logger; @@ -38,6 +39,8 @@ import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; @@ -295,7 +298,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber return stores; } - private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental) + @VisibleForTesting + public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental) { Refs<SSTableReader> refs = new Refs<>(); try @@ -303,30 +307,23 @@ public class StreamSession implements IEndpointStateChangeSubscriber for (ColumnFamilyStore cfStore : stores) { final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); + final IPartitioner partitioner = cfStore.partitioner; for (Range<Token> range : ranges) rowBoundsList.add(range.toRowBounds()); refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>() { public List<SSTableReader> apply(DataTracker.View view) { - Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>(); - for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)) - permittedInstances.put(reader, reader); - + DataTracker.SSTableIntervalTree intervalTree = DataTracker.buildIntervalTree(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)); Set<SSTableReader> sstables = Sets.newHashSet(); for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) { - // sstableInBounds may contain early opened sstables - for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) + for (SSTableReader sstable : DataTracker.View.sstablesInBounds(rowBounds, intervalTree, partitioner)) { - if (isIncremental && sstable.isRepaired()) - continue; - sstable = permittedInstances.get(sstable); - if (sstable != null) + if (!isIncremental || !sstable.isRepaired()) sstables.add(sstable); } } - return ImmutableList.copyOf(sstables); } }).refs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index a735657..1fb28f5 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -21,13 +21,16 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Sets; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; @@ -39,7 +42,11 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.notifications.INotification; +import org.apache.cassandra.notifications.INotificationConsumer; +import org.apache.cassandra.notifications.SSTableListChangedNotification; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; @@ -767,6 +774,71 @@ public class SSTableRewriterTest extends SchemaLoader truncate(cfs); } + @Test + public void testSSTableSectionsForRanges() throws IOException, InterruptedException, ExecutionException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + cfs.addSSTable(writeFile(cfs, 1000)); + + Collection<SSTableReader> allSSTables = cfs.getSSTables(); + assertEquals(1, allSSTables.size()); + final Token firstToken = allSSTables.iterator().next().first.getToken(); + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); + + List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( + Collections.singleton(new Range<Token>(firstToken, firstToken)), + Collections.singleton(cfs), 0L, false); + assertEquals(1, sectionsBeforeRewrite.size()); + for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) + section.ref.release(); + final AtomicInteger checkCount = new AtomicInteger(); + // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables + INotificationConsumer consumer = new INotificationConsumer() + { + public void handleNotification(INotification notification, Object sender) + { + if (notification instanceof SSTableListChangedNotification) + { + Collection<SSTableReader> added = ((SSTableListChangedNotification) notification).added; + Collection<SSTableReader> removed = ((SSTableListChangedNotification) notification).removed; + // note that we need to check if added.equals(removed) because once the compaction is done the old sstable will have + // selfRef().globalCount() == 0 and we cant get the SectionsForRanges then. During incremental opening we always add and remove the same + // sstable (note that the sstables are x.equal(y) but not x == y since the new one will be a new instance with a moved starting point + // In this case we must avoid trying to call getSSTableSectionsForRanges since we are in the notification + // method and trying to reference an sstable with globalcount == 0 puts it into a loop, and this blocks the tracker from removing the + // unreferenced sstable. + if (added.isEmpty() || !added.iterator().next().getColumnFamilyName().equals(cfs.getColumnFamilyName()) || !added.equals(removed)) + return; + + // at no point must the rewrite process hide + // sections returned by getSSTableSectionsForRanges + Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); + List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false); + assertEquals(1, sections.size()); + for (StreamSession.SSTableStreamingSections section : sections) + section.ref.release(); + checkCount.incrementAndGet(); + } + } + }; + cfs.getDataTracker().subscribe(consumer); + try + { + cfs.forceMajorCompaction(); + // reset + } + finally + { + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50); + cfs.getDataTracker().unsubscribe(consumer); + } + assertTrue(checkCount.get() >= 2); + truncate(cfs); + } + /** * emulates anticompaction - writing from one source sstable to two new sstables *