Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489c2f69 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489c2f69 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489c2f69 Branch: refs/heads/cassandra-3.11 Commit: 489c2f69510b001770d9a59e55ba5d5175019050 Parents: 4e23c9e f8912ce Author: Mick Semb Wever <m...@apache.org> Authored: Fri Jun 29 16:53:36 2018 +1000 Committer: Mick Semb Wever <m...@apache.org> Committed: Fri Jun 29 16:57:34 2018 +1000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 66 ++++++----- .../db/compaction/AntiCompactionTest.java | 109 ++++++++++++++++++- 3 files changed, 147 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index aeeb0ae,9d6a9ea..d694f3b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,34 -1,5 +1,35 @@@ -2.2.13 +3.0.17 + * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515) + * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513) + * Fix regression of lagging commitlog flush log message (CASSANDRA-14451) + * Add Missing dependencies in pom-all (CASSANDRA-14422) + * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447) + * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121) + * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418) + * Fix progress stats and units in compactionstats (CASSANDRA-12244) + * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379) + * Delay hints store excise by write timeout to avoid race with decommission (CASSANDRA-13740) + * Deprecate background repair and probablistic read_repair_chance table options + (CASSANDRA-13910) + * Add missed CQL keywords to documentation (CASSANDRA-14359) + * Fix unbounded validation compactions on repair / revert CASSANDRA-13797 (CASSANDRA-14332) + * Avoid deadlock when running nodetool refresh before node is fully up (CASSANDRA-14310) + * Handle all exceptions when opening sstables (CASSANDRA-14202) + * Handle incompletely written hint descriptors during startup (CASSANDRA-14080) + * Handle repeat open bound from SRP in read repair (CASSANDRA-14330) + * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252) + * Respect max hint window when hinting for LWT (CASSANDRA-14215) + * Adding missing WriteType enum values to v3, v4, and v5 spec (CASSANDRA-13697) + * Don't regenerate bloomfilter and summaries on startup (CASSANDRA-11163) + * Fix NPE when performing comparison against a null frozen in LWT (CASSANDRA-14087) + * Log when SSTables are deleted (CASSANDRA-14302) + * Fix batch commitlog sync regression (CASSANDRA-14292) + * Write to pending endpoint when view replica is also base replica (CASSANDRA-14251) + * Chain commit log marker potential performance regression in batch commit mode (CASSANDRA-14194) + * Fully utilise specified compaction threads (CASSANDRA-14210) + * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763) +Merged from 2.2: + * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423) * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551) * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743) * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411) http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index ab363e0,013fc04..f033bf2 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -474,6 -460,16 +474,17 @@@ public class CompactionManager implemen }, jobs, OperationType.CLEANUP); } + /** + * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables + * as repaired. + * + * @param cfs Column family for anti-compaction + * @param ranges Repaired ranges to be anti-compacted into separate SSTables. + * @param sstables {@link Refs} of SSTables within CF to anti-compact. + * @param repairedAt Unix timestamp of when repair was completed. ++ * @param parentRepairSession Corresponding repair session + * @return Futures executing anti-compaction. + */ public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, final Collection<Range<Token>> ranges, final Refs<SSTableReader> sstables, @@@ -522,7 -526,7 +542,8 @@@ * @param cfs * @param ranges Ranges that the repair was carried out on * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them. + * @param txn Transaction across all SSTables that were repaired. + * @param parentRepairSession parent repair session ID * @throws InterruptedException * @throws IOException */ @@@ -530,19 -534,12 +551,13 @@@ Collection<Range<Token>> ranges, Refs<SSTableReader> validatedForRepair, LifecycleTransaction txn, - long repairedAt) throws InterruptedException, IOException + long repairedAt, + UUID parentRepairSession) throws InterruptedException, IOException { - logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size()); - logger.trace("Starting anticompaction for ranges {}", ranges); + logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables()); + logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); - // we should only notify that repair status changed if it actually did: - Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>(); - Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>(); - for (SSTableReader sstable : sstables) - wasRepairedBefore.put(sstable, sstable.isRepaired()); - + Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only Set<SSTableReader> nonAnticompacting = new HashSet<>(); Iterator<SSTableReader> sstableIterator = sstables.iterator(); @@@ -562,12 -564,11 +582,11 @@@ { if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right)) { - logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r); + logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r); sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); sstable.reloadSSTableMetadata(); - mutatedRepairStatuses.add(sstable); - if (!wasRepairedBefore.get(sstable)) - mutatedRepairStatusToNotify.add(sstable); + if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired + mutatedRepairStatuses.add(sstable); sstableIterator.remove(); shouldAnticompact = true; break; @@@ -579,9 -580,12 +598,12 @@@ } } + if (!anticompactRanges.isEmpty()) - logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges)); ++ logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableBounds, String.join(", ", anticompactRanges)); + if (!shouldAnticompact) { - logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges); - logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges); ++ logger.info("[repair #{}] SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges); nonAnticompacting.add(sstable); sstableIterator.remove(); } @@@ -1245,19 -1244,11 +1267,11 @@@ */ private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt) { - logger.info("Performing anticompaction on {} sstables", repaired.originals().size()); + int numAnticompact = repaired.originals().size(); + logger.info("Performing anticompaction on {} sstables", numAnticompact); //Group SSTables - Set<SSTableReader> sstables = repaired.originals(); - - // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data. - // Although anti-compaction could work on repaired sstables as well and would result in having more accurate - // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't - // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point. - Set<SSTableReader> unrepairedSSTables = sstables.stream().filter((s) -> !s.isRepaired()).collect(Collectors.toSet()); - - Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables); - - Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals()); ++ Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals()); // iterate over sstables to check if the repaired / unrepaired ranges intersect them. int antiCompactedSSTableCount = 0; for (Collection<SSTableReader> sstableGroup : groupedSSTables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index ead0349,abd9a4a..8991f88 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@@ -21,13 -29,20 +21,17 @@@ import java.io.File import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.List; +import java.util.Set; + import java.util.SortedSet; + import java.util.TreeSet; +import java.util.UUID; + import java.util.concurrent.ExecutionException; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.locator.SimpleStrategy; - +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; + import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.RateLimiter; import org.junit.BeforeClass; import org.junit.After; import org.junit.Test; @@@ -270,12 -273,119 +274,112 @@@ public class AntiCompactionTes try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); } - SSTableReader sstable = Iterables.get(store.getSSTables(), 0); - assertThat(store.getSSTables().size(), is(1)); ++ SSTableReader sstable = Iterables.get(store.getLiveSSTables(), 0); + assertThat(store.getLiveSSTables().size(), is(1)); - assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true)); - assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1)); + assertThat(sstable.isRepaired(), is(true)); + assertThat(sstable.selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + } + + @Test + public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException + { + ColumnFamilyStore store = prepareColumnFamilyStore(); - Collection<SSTableReader> sstables = store.getUnrepairedSSTables(); - assertEquals(store.getSSTables().size(), sstables.size()); ++ Collection<SSTableReader> sstables = getUnrepairedSSTables(store); ++ assertEquals(store.getLiveSSTables().size(), sstables.size()); + // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in + // one repaired and one unrepaired SSTable + Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); ++ UUID parentRepairSession = UUID.randomUUID(); + + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); ++ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); + } + - Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return Integer.compare(o1.descriptor.generation, o2.descriptor.generation); - } - }; - - SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator); - sstablesSorted.addAll(store.getSSTables()); ++ SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(SSTableReader.generationReverseComparator.reversed()); ++ sstablesSorted.addAll(store.getLiveSSTables()); + + SSTableReader sstable = sstablesSorted.first(); - assertThat(store.getSSTables().size(), is(2)); ++ assertThat(store.getLiveSSTables().size(), is(2)); + assertThat(sstable.isRepaired(), is(true)); + assertThat(sstable.selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + + // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first) - sstables = store.getSSTables(); ++ sstables = store.getLiveSSTables(); + // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired) + range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes())); + ranges = Arrays.asList(range); + try (Refs<SSTableReader> refs = Refs.ref(sstables)) + { + // use different repairedAt to ensure it doesn't change - ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200); ++ ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession); + fut.get(); + } + + sstablesSorted.clear(); - sstablesSorted.addAll(store.getSSTables()); ++ sstablesSorted.addAll(store.getLiveSSTables()); + assertThat(sstablesSorted.size(), is(2)); + assertThat(sstablesSorted.first().isRepaired(), is(true)); + assertThat(sstablesSorted.last().isRepaired(), is(false)); + assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L)); + assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L)); + assertThat(sstablesSorted.first().selfRef().globalCount(), is(1)); + assertThat(sstablesSorted.last().selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + + // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting, + // but leave the unrepaired SSTable as is. + range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes())); + ranges = Arrays.asList(range); + + try (Refs<SSTableReader> refs = Refs.ref(sstables)) + { + // Same repaired at, but should be changed on the repaired SSTable now - ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200); ++ ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession); + fut.get(); + } + + sstablesSorted.clear(); - sstablesSorted.addAll(store.getSSTables()); ++ sstablesSorted.addAll(store.getLiveSSTables()); + + assertThat(sstablesSorted.size(), is(2)); + assertThat(sstablesSorted.first().isRepaired(), is(true)); + assertThat(sstablesSorted.last().isRepaired(), is(false)); + assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L)); + assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L)); + assertThat(sstablesSorted.first().selfRef().globalCount(), is(1)); + assertThat(sstablesSorted.last().selfRef().globalCount(), is(1)); + assertThat(store.getTracker().getCompacting().size(), is(0)); + + // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and + // mark unrepaired SSTable as repaired + range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes())); + ranges = Arrays.asList(range); + + try (Refs<SSTableReader> refs = Refs.ref(sstables)) + { + // Both SSTables should have repairedAt of 400 - ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400); ++ ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400, parentRepairSession); + fut.get(); + } + + sstablesSorted.clear(); - sstablesSorted.addAll(store.getSSTables()); ++ sstablesSorted.addAll(store.getLiveSSTables()); + + assertThat(sstablesSorted.size(), is(2)); + assertThat(sstablesSorted.first().isRepaired(), is(true)); + assertThat(sstablesSorted.last().isRepaired(), is(true)); + assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L)); + assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L)); + assertThat(sstablesSorted.first().selfRef().globalCount(), is(1)); + assertThat(sstablesSorted.last().selfRef().globalCount(), is(1)); assertThat(store.getTracker().getCompacting().size(), is(0)); } @@@ -332,11 -446,4 +436,10 @@@ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); store.truncateBlocking(); } + + private static Set<SSTableReader> getUnrepairedSSTables(ColumnFamilyStore cfs) + { + return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired())); + } + - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org