Avoid missing sstables when getting the canonical sstables Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-11996
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc23632f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc23632f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc23632f Branch: refs/heads/cassandra-3.8 Commit: bc23632f201f760147d8bd1fbee68533fc3f6dfa Parents: 5b0566a Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Jun 13 15:29:08 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Jul 6 07:57:24 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 6 +-- .../cassandra/db/SizeEstimatesRecorder.java | 7 +++- .../apache/cassandra/db/lifecycle/Tracker.java | 2 +- .../org/apache/cassandra/db/lifecycle/View.java | 30 +++++++------- .../apache/cassandra/db/view/ViewBuilder.java | 4 +- .../cassandra/index/SecondaryIndexManager.java | 2 +- .../index/internal/CassandraIndex.java | 2 +- .../io/sstable/IndexSummaryManager.java | 2 +- .../cassandra/streaming/StreamSession.java | 4 +- .../apache/cassandra/db/lifecycle/ViewTest.java | 6 +-- .../index/internal/CustomCassandraIndex.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 41 +++++++++++++++++++- 13 files changed, 78 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 99ac3ad..b3063b4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.9 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) * Fix column ordering of results with static columns for Thrift requests in a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 1be3175..b95e88d 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1482,7 +1482,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet) { - return data.getView().sstables(sstableSet); + return data.getView().select(sstableSet); } public Iterable<SSTableReader> getUncompactingSSTables() @@ -1916,7 +1916,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Iterable<DecoratedKey> keySamples(Range<Token> range) { - try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL))) + try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL))) { Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()]; int i = 0; @@ -1930,7 +1930,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public long estimatedKeysForRange(Range<Token> range) { - try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL))) + try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL))) { long count = 0; for (SSTableReader sstable : view.sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java index 3461aef..0b31b87 100644 --- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; @@ -103,8 +104,10 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable { while (refs == null) { - // note that this is not guaranteed to return all sstables within the ranges, but for an estimated size, that should be fine - Iterable<SSTableReader> canonicalSSTables = table.getTracker().getView().select(SSTableSet.CANONICAL, table.select(View.selectLive(Range.makeRowRange(range))).sstables); + Iterable<SSTableReader> sstables = table.getTracker().getView().select(SSTableSet.CANONICAL); + SSTableIntervalTree tree = SSTableIntervalTree.build(sstables); + Range<PartitionPosition> r = Range.makeRowRange(range); + Iterable<SSTableReader> canonicalSSTables = View.sstablesInBounds(r.left, r.right, tree); refs = Refs.tryRef(canonicalSSTables); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 16090a1..c94b88f 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -401,7 +401,7 @@ public class Tracker public Iterable<SSTableReader> getUncompacting() { - return view.get().sstables(SSTableSet.NONCOMPACTING); + return view.get().select(SSTableSet.NONCOMPACTING); } public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index 96aaa49..3fa197f 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -118,14 +118,9 @@ public class View return sstables; } - public Iterable<SSTableReader> sstables(SSTableSet sstableSet) - { - return select(sstableSet, sstables); - } - public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter) { - return select(sstableSet, filter(sstables, filter)); + return filter(select(sstableSet), filter); } // any sstable known by this tracker in any form; we have a special method here since it's only used for testing/debug @@ -136,7 +131,7 @@ public class View return Iterables.concat(sstables, filterOut(compacting, sstables)); } - public Iterable<SSTableReader> select(SSTableSet sstableSet, Iterable<SSTableReader> sstables) + public Iterable<SSTableReader> select(SSTableSet sstableSet) { switch (sstableSet) { @@ -145,9 +140,18 @@ public class View case NONCOMPACTING: return filter(sstables, (s) -> !compacting.contains(s)); case CANONICAL: - return transform(filter(sstables, - (s) -> s.openReason != SSTableReader.OpenReason.EARLY), - (s) -> s.openReason != SSTableReader.OpenReason.MOVED_START ? s : compactingMap.get(s)); + Set<SSTableReader> canonicalSSTables = new HashSet<>(); + for (SSTableReader sstable : compacting) + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + canonicalSSTables.add(sstable); + // reason for checking if compacting contains the sstable is that if compacting has an EARLY version + // of a NORMAL sstable, we still have the canonical version of that sstable in sstables. + // note that the EARLY version is equal, but not == since it is a different instance of the same sstable. + for (SSTableReader sstable : sstables) + if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY) + canonicalSSTables.add(sstable); + + return canonicalSSTables; default: throw new IllegalStateException(); } @@ -190,7 +194,7 @@ public class View return Collections.emptyList(); PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; - return select(SSTableSet.LIVE, intervalTree.search(Interval.create(left, stopInTree))); + return intervalTree.search(Interval.create(left, stopInTree)); } public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree) @@ -204,9 +208,9 @@ public class View return intervalTree.search(Interval.create(left, stopInTree)); } - public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet) + public static Function<View, Iterable<SSTableReader>> selectFunction(SSTableSet sstableSet) { - return (view) -> view.sstables(sstableSet); + return (view) -> view.select(sstableSet); } public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index b2b409b..b55eda0 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -108,7 +108,7 @@ public class ViewBuilder extends CompactionInfo.Holder if (buildStatus == null) { baseCfs.forceBlockingFlush(); - function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL); + function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); int generation = Integer.MIN_VALUE; try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs) @@ -129,7 +129,7 @@ public class ViewBuilder extends CompactionInfo.Holder @Nullable public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view) { - Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL).apply(view); + Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view); if (readers != null) return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left); return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 9635c59..6dfdeee 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -271,7 +271,7 @@ public class SecondaryIndexManager implements IndexRegistry { if (index.shouldBuildBlocking()) { - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); Refs<SSTableReader> sstables = viewFragment.refs) { buildIndexesBlocking(sstables, Collections.singleton(index)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 9d997a7..2a0dec0 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -699,7 +699,7 @@ public abstract class CassandraIndex implements Index { baseCfs.forceBlockingFlush(); - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); Refs<SSTableReader> sstables = viewFragment.refs) { if (sstables.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index aed35c9..ddda430 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -208,7 +208,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean do { View view = cfStore.getTracker().getView(); - allSSTables = ImmutableSet.copyOf(view.sstables(SSTableSet.CANONICAL)); + allSSTables = ImmutableSet.copyOf(view.select(SSTableSet.CANONICAL)); nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables)); } while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index a14f815..bfbedc7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -329,7 +329,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber keyRanges.add(Range.makeRowRange(range)); refs.addAll(cfStore.selectAndReference(view -> { Set<SSTableReader> sstables = Sets.newHashSet(); - SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.sstables(SSTableSet.CANONICAL)); + SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL)); for (Range<PartitionPosition> keyRange : keyRanges) { // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). @@ -346,7 +346,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber } if (logger.isDebugEnabled()) - logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL))); + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL))); return sstables; }).refs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java index 98f9300..a5dceca 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java @@ -100,8 +100,8 @@ public class ViewTest Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur)); Assert.assertTrue(readers.subList(2, 5).containsAll(copyOf(cur.getUncompacting(readers)))); Assert.assertEquals(3, copyOf(cur.getUncompacting(readers)).size()); - Assert.assertTrue(ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)).containsAll(readers.subList(2, 5))); - Assert.assertEquals(3, ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)).size()); + Assert.assertTrue(ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)).containsAll(readers.subList(2, 5))); + Assert.assertEquals(3, ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)).size()); // check marking already compacting readers fails with an exception testFailure(View.updateCompacting(emptySet(), readers.subList(0, 1)), cur); @@ -129,7 +129,7 @@ public class ViewTest testFailure(View.updateCompacting(copyOf(readers.subList(0, 2)), emptySet()), cur); Assert.assertTrue(copyOf(concat(readers.subList(0, 1), readers.subList(2, 5))).containsAll(copyOf(cur.getUncompacting(readers)))); Assert.assertEquals(4, copyOf(cur.getUncompacting(readers)).size()); - Set<SSTableReader> nonCompacting = ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)); + Set<SSTableReader> nonCompacting = ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)); Assert.assertTrue(nonCompacting.containsAll(readers.subList(2, 5))); Assert.assertTrue(nonCompacting.containsAll(readers.subList(0, 1))); Assert.assertEquals(4, nonCompacting.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index 6aaefb7..2124abe 100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@ -623,7 +623,7 @@ public class CustomCassandraIndex implements Index { baseCfs.forceBlockingFlush(); - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); Refs<SSTableReader> sstables = viewFragment.refs) { if (sstables.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 18bc760..c842b7f 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -759,7 +759,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase if (!checked && writer.currentWriter().getFilePointer() > 15000000) { checked = true; - ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.select(SSTableSet.CANONICAL)); + ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.selectFunction(SSTableSet.CANONICAL)); // canonical view should have only one SSTable which is not opened early. assertEquals(1, viewFragment.sstables.size()); SSTableReader sstable = viewFragment.sstables.get(0); @@ -872,6 +872,45 @@ public class SSTableRewriterTest extends SSTableWriterTestBase validateCFS(cfs); } + @Test + public void testCanonicalSSTables() throws ExecutionException, InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + cfs.addSSTable(writeFile(cfs, 100)); + Collection<SSTableReader> allSSTables = cfs.getSSTables(); + assertEquals(1, allSSTables.size()); + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean failed = new AtomicBoolean(false); + Runnable r = () -> { + while (!done.get()) + { + Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL); + if (Iterables.size(sstables) != 1) + { + failed.set(true); + return; + } + } + }; + Thread t = new Thread(r); + try + { + t.start(); + cfs.forceMajorCompaction(); + } + finally + { + done.set(true); + t.join(20); + } + assertFalse(failed.get()); + + + } + private void validateKeys(Keyspace ks) { for (int i = 0; i < 100; i++)