Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 1143bc113 -> f8912ce93 refs/heads/cassandra-3.0 4e23c9e4d -> 489c2f695 refs/heads/cassandra-3.11 ea62d8862 -> bba0d03e9 refs/heads/trunk 4cb83cb81 -> 5cc68a873
Stop SSTables being lost from compaction strategy after full repairs patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for CASSANDRA-14423 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9 Branch: refs/heads/cassandra-2.2 Commit: f8912ce9329a8bc360e93cf61e56814135fbab39 Parents: 1143bc1 Author: kurt <k...@instaclustr.com> Authored: Thu Jun 14 10:59:19 2018 +0000 Committer: Mick Semb Wever <m...@apache.org> Committed: Fri Jun 29 16:49:53 2018 +1000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 70 ++++++----- .../db/compaction/AntiCompactionTest.java | 120 ++++++++++++++++++- 3 files changed, 156 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7b1089e..9d6a9ea 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.13 + * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423) * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551) * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743) * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 419f66e..013fc04 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -460,6 +460,16 @@ public class CompactionManager implements CompactionManagerMBean }, jobs, OperationType.CLEANUP); } + /** + * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables + * as repaired. + * + * @param cfs Column family for anti-compaction + * @param ranges Repaired ranges to be anti-compacted into separate SSTables. + * @param sstables {@link Refs} of SSTables within CF to anti-compact. + * @param repairedAt Unix timestamp of when repair was completed. + * @return Futures executing anti-compaction. + */ public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, final Collection<Range<Token>> ranges, final Refs<SSTableReader> sstables, @@ -475,6 +485,8 @@ public class CompactionManager implements CompactionManagerMBean { for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting()) sstables.releaseIfHolds(compactingSSTable); + // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted + // with unrepaired data. Set<SSTableReader> compactedSSTables = new HashSet<>(); for (SSTableReader sstable : sstables) if (sstable.isMarkedCompacted()) @@ -504,9 +516,17 @@ public class CompactionManager implements CompactionManagerMBean * * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)). * + * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data. + * Although anti-compaction could work on repaired sstables as well and would result in having more accurate + * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't + * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will + * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require + * anticompaction. + * * @param cfs * @param ranges Ranges that the repair was carried out on * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them. + * @param txn Transaction across all SSTables that were repaired. * @throws InterruptedException * @throws IOException */ @@ -519,13 +539,7 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size()); logger.trace("Starting anticompaction for ranges {}", ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); - // we should only notify that repair status changed if it actually did: - Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>(); - Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>(); - for (SSTableReader sstable : sstables) - wasRepairedBefore.put(sstable, sstable.isRepaired()); - + Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only Set<SSTableReader> nonAnticompacting = new HashSet<>(); Iterator<SSTableReader> sstableIterator = sstables.iterator(); @@ -536,6 +550,11 @@ public class CompactionManager implements CompactionManagerMBean while (sstableIterator.hasNext()) { SSTableReader sstable = sstableIterator.next(); + List<String> anticompactRanges = new ArrayList<>(); + // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153 + // and CASSANDRA-14423. + if (sstable.isRepaired()) // We never anti-compact already repaired SSTables + nonAnticompacting.add(sstable); Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); @@ -548,28 +567,30 @@ public class CompactionManager implements CompactionManagerMBean logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r); sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); sstable.reloadSSTableMetadata(); - mutatedRepairStatuses.add(sstable); - if (!wasRepairedBefore.get(sstable)) - mutatedRepairStatusToNotify.add(sstable); + if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired + mutatedRepairStatuses.add(sstable); sstableIterator.remove(); shouldAnticompact = true; break; } - else if (r.intersects(sstableBounds)) + else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable)) { - logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableBounds, r); + anticompactRanges.add(r.toString()); shouldAnticompact = true; } } + if (!anticompactRanges.isEmpty()) + logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges)); + if (!shouldAnticompact) { - logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges); + logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges); nonAnticompacting.add(sstable); sstableIterator.remove(); } } - cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify); + cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses)); validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); assert txn.originals().equals(sstables); @@ -1223,24 +1244,11 @@ public class CompactionManager implements CompactionManagerMBean */ private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt) { - logger.info("Performing anticompaction on {} sstables", repaired.originals().size()); + int numAnticompact = repaired.originals().size(); + logger.info("Performing anticompaction on {} sstables", numAnticompact); //Group SSTables - Set<SSTableReader> sstables = repaired.originals(); - - // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data. - // Although anti-compaction could work on repaired sstables as well and would result in having more accurate - // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't - // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point. - Set<SSTableReader> unrepairedSSTables = ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader input) - { - return !input.isRepaired(); - } - })); - - Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables); + Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals()); // iterate over sstables to check if the repaired / unrepaired ranges intersect them. int antiCompactedSSTableCount = 0; for (Collection<SSTableReader> sstableGroup : groupedSSTables) @@ -1253,7 +1261,7 @@ public class CompactionManager implements CompactionManagerMBean } String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format, repaired.originals().size(), antiCompactedSSTableCount); + logger.info(format, numAnticompact, antiCompactedSSTableCount); } private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index c451516..abd9a4a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -29,13 +29,20 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.locator.SimpleStrategy; + +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import org.junit.BeforeClass; import org.junit.After; import org.junit.Test; @@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; -import com.google.common.collect.Iterables; - public class AntiCompactionTest { private static final String KEYSPACE1 = "AntiCompactionTest"; @@ -271,9 +276,116 @@ public class AntiCompactionTest CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); } + SSTableReader sstable = Iterables.get(store.getSSTables(), 0); assertThat(store.getSSTables().size(), is(1)); - assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true)); - assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1)); + assertThat(sstable.isRepaired(), is(true)); + assertThat(sstable.selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + } + + @Test + public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Collection<SSTableReader> sstables = store.getUnrepairedSSTables(); + assertEquals(store.getSSTables().size(), sstables.size()); + // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in + // one repaired and one unrepaired SSTable + Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); + } + + Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return Integer.compare(o1.descriptor.generation, o2.descriptor.generation); + } + }; + + SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator); + sstablesSorted.addAll(store.getSSTables()); + + SSTableReader sstable = sstablesSorted.first(); + assertThat(store.getSSTables().size(), is(2)); + assertThat(sstable.isRepaired(), is(true)); + assertThat(sstable.selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + + // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first) + sstables = store.getSSTables(); + // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired) + range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes())); + ranges = Arrays.asList(range); + try (Refs<SSTableReader> refs = Refs.ref(sstables)) + { + // use different repairedAt to ensure it doesn't change + ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200); + fut.get(); + } + + sstablesSorted.clear(); + sstablesSorted.addAll(store.getSSTables()); + assertThat(sstablesSorted.size(), is(2)); + assertThat(sstablesSorted.first().isRepaired(), is(true)); + assertThat(sstablesSorted.last().isRepaired(), is(false)); + assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L)); + assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L)); + assertThat(sstablesSorted.first().selfRef().globalCount(), is(1)); + assertThat(sstablesSorted.last().selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + + // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting, + // but leave the unrepaired SSTable as is. + range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes())); + ranges = Arrays.asList(range); + + try (Refs<SSTableReader> refs = Refs.ref(sstables)) + { + // Same repaired at, but should be changed on the repaired SSTable now + ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200); + fut.get(); + } + + sstablesSorted.clear(); + sstablesSorted.addAll(store.getSSTables()); + + assertThat(sstablesSorted.size(), is(2)); + assertThat(sstablesSorted.first().isRepaired(), is(true)); + assertThat(sstablesSorted.last().isRepaired(), is(false)); + assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L)); + assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L)); + assertThat(sstablesSorted.first().selfRef().globalCount(), is(1)); + assertThat(sstablesSorted.last().selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + + // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and + // mark unrepaired SSTable as repaired + range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes())); + ranges = Arrays.asList(range); + + try (Refs<SSTableReader> refs = Refs.ref(sstables)) + { + // Both SSTables should have repairedAt of 400 + ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400); + fut.get(); + } + + sstablesSorted.clear(); + sstablesSorted.addAll(store.getSSTables()); + + assertThat(sstablesSorted.size(), is(2)); + assertThat(sstablesSorted.first().isRepaired(), is(true)); + assertThat(sstablesSorted.last().isRepaired(), is(true)); + assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L)); + assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L)); + assertThat(sstablesSorted.first().selfRef().globalCount(), is(1)); + assertThat(sstablesSorted.last().selfRef().globalCount(), is(1)); assertThat(store.getTracker().getCompacting().size(), is(0)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org