Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05f8a008 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05f8a008 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05f8a008 Branch: refs/heads/cassandra-2.2 Commit: 05f8a008f696d9624ec85176fa0e2a1ce06a1ad5 Parents: 593bbf5 72acbcd Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Jun 13 14:34:01 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Jun 13 15:00:08 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 4 ++ .../org/apache/cassandra/db/lifecycle/View.java | 5 ++ .../cassandra/streaming/StreamSession.java | 22 +++---- .../io/sstable/SSTableRewriterTest.java | 66 ++++++++++++++++++++ 5 files changed, 86 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index d639d43,ebcc90c..491f72a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,31 -1,7 +1,32 @@@ -2.1.15 +2.2.7 + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Persist local metadata earlier in startup sequence (CASSANDRA-11742) + * Run CommitLog tests with different compression settings (CASSANDRA-9039) + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) - * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables (CASSANDRA-11055) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) * Backport CASSANDRA-11578 (CASSANDRA-11750) * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index e24917c,559ba0b..d3a5028 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -1588,8 -1529,12 +1588,12 @@@ public class DatabaseDescripto public static int getSSTablePreempiveOpenIntervalInMB() { - return conf.sstable_preemptive_open_interval_in_mb; + return FBUtilities.isWindows() ? -1 : conf.sstable_preemptive_open_interval_in_mb; } + public static void setSSTablePreempiveOpenIntervalInMB(int mb) + { + conf.sstable_preemptive_open_interval_in_mb = mb; + } public static boolean getTrickleFsync() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java index fba1627,0000000..e303801 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@@ -1,281 -1,0 +1,286 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.util.*; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Predicate; +import com.google.common.collect.*; + +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Interval; + +import static com.google.common.base.Predicates.equalTo; +import static com.google.common.base.Predicates.not; +import static com.google.common.collect.ImmutableList.copyOf; +import static com.google.common.collect.ImmutableList.of; +import static com.google.common.collect.Iterables.all; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.filter; +import static java.util.Collections.singleton; +import static org.apache.cassandra.db.lifecycle.Helpers.emptySet; +import static org.apache.cassandra.db.lifecycle.Helpers.replace; + +/** + * An immutable structure holding the current memtable, the memtables pending + * flush, the sstables for a column family, and the sstables that are active + * in compaction (a subset of the sstables). + * + * Modifications to instances are all performed via a Function produced by the static methods in this class. + * These are composed as necessary and provided to the Tracker.apply() methods, which atomically reject or + * accept and apply the changes to the View. + * + */ +public class View +{ + /** + * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush + * and the new replacement memtable, until all outstanding write operations on the old table complete. + * The last item in the list is always the "current" memtable. + */ + public final List<Memtable> liveMemtables; + /** + * contains all memtables that are no longer referenced for writing and are queued for / in the process of being + * flushed. In chronologically ascending order. + */ + public final List<Memtable> flushingMemtables; + public final Set<SSTableReader> compacting; + public final Set<SSTableReader> sstables; + public final Set<SSTableReader> premature; + // we use a Map here so that we can easily perform identity checks as well as equality checks. + // When marking compacting, we now indicate if we expect the sstables to be present (by default we do), + // and we then check that not only are they all present in the live set, but that the exact instance present is + // the one we made our decision to compact against. + public final Map<SSTableReader, SSTableReader> sstablesMap; + + public final SSTableIntervalTree intervalTree; + + View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree) + { + assert liveMemtables != null; + assert flushingMemtables != null; + assert sstables != null; + assert compacting != null; + assert intervalTree != null; + + this.liveMemtables = liveMemtables; + this.flushingMemtables = flushingMemtables; + + this.sstablesMap = sstables; + this.sstables = sstablesMap.keySet(); + this.compacting = compacting; + this.premature = premature; + this.intervalTree = intervalTree; + } + + public Memtable getCurrentMemtable() + { + return liveMemtables.get(liveMemtables.size() - 1); + } + + /** + * @return the active memtable and all the memtables that are pending flush. + */ + public Iterable<Memtable> getAllMemtables() + { + return concat(flushingMemtables, liveMemtables); + } + + public Sets.SetView<SSTableReader> nonCompactingSStables() + { + return Sets.difference(sstables, compacting); + } + + public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates) + { + return filter(candidates, new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return !compacting.contains(sstable); + } + }); + } + + @Override + public String toString() + { + return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting); + } + + /** + * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively. + * The interval formed by {@code left} and {@code right} shouldn't wrap. + */ + public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right) + { ++ return sstablesInBounds(left, right, intervalTree); ++ } ++ ++ public static List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right, SSTableIntervalTree intervalTree) ++ { + assert !AbstractBounds.strictlyWrapsAround(left, right); + + if (intervalTree.isEmpty()) + return Collections.emptyList(); + + RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; + return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree)); + } + + // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW: + + // return a function to un/mark the provided readers compacting in a view + static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark) + { + if (unmark.isEmpty() && Iterables.isEmpty(mark)) + return Functions.identity(); + return new Function<View, View>() + { + public View apply(View view) + { + assert all(mark, Helpers.idIn(view.sstablesMap)); + return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, + replace(view.compacting, unmark, mark), + view.premature, view.intervalTree); + } + }; + } + + // construct a predicate to reject views that do not permit us to mark these readers compacting; + // i.e. one of them is either already compacting, has been compacted, or has been replaced + static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers) + { + return new Predicate<View>() + { + public boolean apply(View view) + { + for (SSTableReader reader : readers) + if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader)) + return false; + return true; + } + }; + } + + // construct a function to change the liveset in a Snapshot + static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add) + { + if (remove.isEmpty() && Iterables.isEmpty(add)) + return Functions.identity(); + return new Function<View, View>() + { + public View apply(View view) + { + Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add); + return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, view.premature, + SSTableIntervalTree.build(sstableMap.keySet())); + } + }; + } + + // called prior to initiating flush: add newMemtable to liveMemtables, making it the latest memtable + static Function<View, View> switchMemtable(final Memtable newMemtable) + { + return new Function<View, View>() + { + public View apply(View view) + { + List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build(); + assert newLive.size() == view.liveMemtables.size() + 1; + return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.premature, view.intervalTree); + } + }; + } + + // called before flush: move toFlush from liveMemtables to flushingMemtables + static Function<View, View> markFlushing(final Memtable toFlush) + { + return new Function<View, View>() + { + public View apply(View view) + { + List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables; + List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush)))); + List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)), + of(toFlush), + filter(flushing, not(lessThan(toFlush))))); + assert newLive.size() == live.size() - 1; + assert newFlushing.size() == flushing.size() + 1; + return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.premature, view.intervalTree); + } + }; + } + + // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set + static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed) + { + return new Function<View, View>() + { + public View apply(View view) + { + List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable)))); + assert flushingMemtables.size() == view.flushingMemtables.size() - 1; + + if (flushed == null) + return new View(view.liveMemtables, flushingMemtables, view.sstablesMap, + view.compacting, view.premature, view.intervalTree); + + Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed)); + Set<SSTableReader> compacting = replace(view.compacting, emptySet(), singleton(flushed)); + Set<SSTableReader> premature = replace(view.premature, emptySet(), singleton(flushed)); + return new View(view.liveMemtables, flushingMemtables, sstableMap, compacting, premature, + SSTableIntervalTree.build(sstableMap.keySet())); + } + }; + } + + static Function<View, View> permitCompactionOfFlushed(final SSTableReader reader) + { + return new Function<View, View>() + { + + @Nullable + public View apply(View view) + { + Set<SSTableReader> premature = ImmutableSet.copyOf(filter(view.premature, not(equalTo(reader)))); + Set<SSTableReader> compacting = ImmutableSet.copyOf(filter(view.compacting, not(equalTo(reader)))); + return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compacting, premature, view.intervalTree); + } + }; + } + + + private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan) + { + return new Predicate<T>() + { + public boolean apply(T t) + { + return t.compareTo(lessThan) < 0; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index f14b94c,273631c..f4c900e --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -25,18 -25,22 +25,22 @@@ import java.util.* import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; - + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.*; + ++import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowPosition; + import org.apache.cassandra.dht.AbstractBounds; + import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; @@@ -279,7 -270,7 +283,7 @@@ public class StreamSession implements I flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt); - List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE); ++ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, isIncremental); try { addTransferFiles(sections); @@@ -314,33 -306,21 +319,26 @@@ { for (ColumnFamilyStore cfStore : stores) { - final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); - final IPartitioner partitioner = cfStore.partitioner; + final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) - rowBoundsList.add(range.toRowBounds()); - refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>() + keyRanges.add(Range.makeRowRange(range)); + refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { - Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>(); - for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)) - permittedInstances.put(reader, reader); - - DataTracker.SSTableIntervalTree intervalTree = DataTracker.buildIntervalTree(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)); ++ SSTableIntervalTree intervalTree = SSTableIntervalTree.build(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)); Set<SSTableReader> sstables = Sets.newHashSet(); - for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) + for (Range<RowPosition> keyRange : keyRanges) { - for (SSTableReader sstable : DataTracker.View.sstablesInBounds(rowBounds, intervalTree, partitioner)) + // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). + // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). + // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that + // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even + // including keyRange.left will still exclude any key having the token of the original token range, and so we're + // still actually selecting what we wanted. - for (SSTableReader sstable : view.sstablesInBounds(keyRange.left, keyRange.right)) ++ for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree)) { - // sstableInBounds may contain early opened sstables - if (isIncremental && sstable.isRepaired()) - continue; - sstable = permittedInstances.get(sstable); - if (sstable != null) + if (!isIncremental || !sstable.isRepaired()) sstables.add(sstable); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index c2cc6e3,1fb28f5..f50953a --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@@ -21,15 -21,11 +21,18 @@@ import java.io.File import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; + import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@@ -41,25 -35,24 +44,29 @@@ import org.apache.cassandra.db.* import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.db.compaction.SSTableSplitter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.metrics.StorageMetrics; + import org.apache.cassandra.notifications.INotification; + import org.apache.cassandra.notifications.INotificationConsumer; + import org.apache.cassandra.notifications.SSTableListChangedNotification; import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class SSTableRewriterTest extends SchemaLoader { @@@ -807,10 -769,76 +814,69 @@@ } } } - writer.abort(); - cfs.getDataTracker().unmarkCompacting(sstables); - truncate(cfs); + truncateCF(); + validateCFS(cfs); } + @Test + public void testSSTableSectionsForRanges() throws IOException, InterruptedException, ExecutionException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + cfs.addSSTable(writeFile(cfs, 1000)); + + Collection<SSTableReader> allSSTables = cfs.getSSTables(); + assertEquals(1, allSSTables.size()); + final Token firstToken = allSSTables.iterator().next().first.getToken(); + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); + + List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( + Collections.singleton(new Range<Token>(firstToken, firstToken)), + Collections.singleton(cfs), 0L, false); + assertEquals(1, sectionsBeforeRewrite.size()); + for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) + section.ref.release(); + final AtomicInteger checkCount = new AtomicInteger(); + // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables - INotificationConsumer consumer = new INotificationConsumer() ++ final AtomicBoolean done = new AtomicBoolean(false); ++ final AtomicBoolean failed = new AtomicBoolean(false); ++ Runnable r = new Runnable() ++ { ++ public void run() ++ { ++ while (!done.get()) + { - public void handleNotification(INotification notification, Object sender) - { - if (notification instanceof SSTableListChangedNotification) - { - Collection<SSTableReader> added = ((SSTableListChangedNotification) notification).added; - Collection<SSTableReader> removed = ((SSTableListChangedNotification) notification).removed; - // note that we need to check if added.equals(removed) because once the compaction is done the old sstable will have - // selfRef().globalCount() == 0 and we cant get the SectionsForRanges then. During incremental opening we always add and remove the same - // sstable (note that the sstables are x.equal(y) but not x == y since the new one will be a new instance with a moved starting point - // In this case we must avoid trying to call getSSTableSectionsForRanges since we are in the notification - // method and trying to reference an sstable with globalcount == 0 puts it into a loop, and this blocks the tracker from removing the - // unreferenced sstable. - if (added.isEmpty() || !added.iterator().next().getColumnFamilyName().equals(cfs.getColumnFamilyName()) || !added.equals(removed)) - return; - - // at no point must the rewrite process hide - // sections returned by getSSTableSectionsForRanges - Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); - List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false); - assertEquals(1, sections.size()); - for (StreamSession.SSTableStreamingSections section : sections) - section.ref.release(); - checkCount.incrementAndGet(); - } - } - }; - cfs.getDataTracker().subscribe(consumer); ++ Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); ++ List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false); ++ if (sections.size() != 1) ++ failed.set(true); ++ for (StreamSession.SSTableStreamingSections section : sections) ++ section.ref.release(); ++ checkCount.incrementAndGet(); ++ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS); ++ } ++ } ++ }; ++ Thread t = new Thread(r); + try + { ++ t.start(); + cfs.forceMajorCompaction(); + // reset + } + finally + { + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50); - cfs.getDataTracker().unsubscribe(consumer); ++ done.set(true); ++ t.join(20); + } ++ assertFalse(failed.get()); + assertTrue(checkCount.get() >= 2); + truncate(cfs); + } + /** * emulates anticompaction - writing from one source sstable to two new sstables *