This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push: new f41ea9f Make sure LCS handles duplicate sstable added/removed notifications correctly. f41ea9f is described below commit f41ea9fb14936bca4aeea0ab2bf6d55c51f37f6a Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Sep 9 12:49:42 2020 +0200 Make sure LCS handles duplicate sstable added/removed notifications correctly. Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-14103 --- CHANGES.txt | 1 + NEWS.txt | 8 + .../db/compaction/AbstractCompactionStrategy.java | 18 +- .../db/compaction/CompactionStrategyManager.java | 240 ++++++--------- .../db/compaction/LeveledCompactionStrategy.java | 28 +- .../db/compaction/LeveledGenerations.java | 311 +++++++++++++++++++ .../cassandra/db/compaction/LeveledManifest.java | 340 +++++---------------- .../apache/cassandra/tools/StandaloneScrubber.java | 31 +- .../LongLeveledCompactionStrategyCQLTest.java | 92 ++++++ .../LongLeveledCompactionStrategyTest.java | 2 +- test/unit/org/apache/cassandra/MockSchema.java | 27 +- .../compaction/LeveledCompactionStrategyTest.java | 238 ++++++++++++++- .../db/compaction/LeveledGenerationsTest.java | 199 ++++++++++++ 13 files changed, 1076 insertions(+), 459 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0a56c4f..117fa96 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.9 + * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103) 3.11.8 * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071) diff --git a/NEWS.txt b/NEWS.txt index 077bd8b..b3246b5 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -42,6 +42,14 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +3.11.9 +====== +Upgrading +--------- + - Custom compaction strategies must handle getting sstables added/removed notifications for + sstables already added/removed - see CASSANDRA-14103 for details. This has been a requirement + for correct operation since 3.11.0 due to an issue in CompactionStrategyManager. + 3.11.7 ====== diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 3a5e9aa..4298be8 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -309,22 +309,36 @@ public abstract class AbstractCompactionStrategy return getClass().getSimpleName(); } + /** + * Replaces sstables in the compaction strategy + * + * Note that implementations must be able to handle duplicate notifications here (that removed are already gone and + * added have already been added) + * */ public synchronized void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added) { for (SSTableReader remove : removed) removeSSTable(remove); - for (SSTableReader add : added) - addSSTable(add); + addSSTables(added); } + /** + * Adds sstable, note that implementations must handle duplicate notifications here (added already being in the compaction strategy) + */ public abstract void addSSTable(SSTableReader added); + /** + * Adds sstables, note that implementations must handle duplicate notifications here (added already being in the compaction strategy) + */ public synchronized void addSSTables(Iterable<SSTableReader> added) { for (SSTableReader sstable : added) addSSTable(sstable); } + /** + * Removes sstable from the strategy, implementations must be able to handle the sstable having already been removed. + */ public abstract void removeSSTable(SSTableReader sstable); public static class ScannerList implements AutoCloseable diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index d486679..c80504c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -104,8 +104,10 @@ public class CompactionStrategyManager implements INotificationConsumer we will use the new compaction parameters. **/ private volatile CompactionParams schemaCompactionParams; - private boolean supportsEarlyOpen; - private int fanout; + private volatile boolean supportsEarlyOpen; + private volatile int fanout; + private volatile long maxSSTableSizeBytes; + private volatile String name; public CompactionStrategyManager(ColumnFamilyStore cfs) { @@ -217,6 +219,8 @@ public class CompactionStrategyManager implements INotificationConsumer unrepaired.forEach(AbstractCompactionStrategy::startup); supportsEarlyOpen = repaired.get(0).supportsEarlyOpen(); fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE; + name = repaired.get(0).getName(); + maxSSTableSizeBytes = repaired.get(0).getMaxSSTableBytes(); } finally { @@ -271,8 +275,7 @@ public class CompactionStrategyManager implements INotificationConsumer * @param sstable * @return */ - @VisibleForTesting - protected int compactionStrategyIndexFor(SSTableReader sstable) + int compactionStrategyIndexFor(SSTableReader sstable) { // should not call maybeReload because it may be called from within lock readLock.lock(); @@ -340,18 +343,17 @@ public class CompactionStrategyManager implements INotificationConsumer */ //TODO improve this to reload after receiving a notification rather than trying to reload on every operation @VisibleForTesting - protected boolean maybeReloadDiskBoundaries() + protected void maybeReloadDiskBoundaries() { if (!currentBoundaries.isOutOfDate()) - return false; + return; writeLock.lock(); try { if (!currentBoundaries.isOutOfDate()) - return false; + return; reload(params); - return true; } finally { @@ -434,7 +436,7 @@ public class CompactionStrategyManager implements INotificationConsumer { if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) { - int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; + int[] res = new int[LeveledGenerations.MAX_LEVEL_COUNT]; for (AbstractCompactionStrategy strategy : repaired) { int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); @@ -485,153 +487,120 @@ public class CompactionStrategyManager implements INotificationConsumer } } + /** + * Should only be called holding the readLock + */ private void handleFlushNotification(Iterable<SSTableReader> added) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - - readLock.lock(); - try - { - for (SSTableReader sstable : added) - compactionStrategyFor(sstable).addSSTable(sstable); - } - finally - { - readLock.unlock(); - } + for (SSTableReader sstable : added) + compactionStrategyFor(sstable).addSSTable(sstable); } + /** + * Should only be called holding the readLock + */ private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - - readLock.lock(); - try - { - // a bit of gymnastics to be able to replace sstables in compaction strategies - // we use this to know that a compaction finished and where to start the next compaction in LCS - int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1; + // a bit of gymnastics to be able to replace sstables in compaction strategies + // we use this to know that a compaction finished and where to start the next compaction in LCS + int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1; - List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); - List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); - List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); - List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); + List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); + List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); + List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); + List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); - for (int i = 0; i < locationSize; i++) - { - repairedRemoved.add(new HashSet<>()); - repairedAdded.add(new HashSet<>()); - unrepairedRemoved.add(new HashSet<>()); - unrepairedAdded.add(new HashSet<>()); - } - - for (SSTableReader sstable : removed) - { - int i = compactionStrategyIndexFor(sstable); - if (sstable.isRepaired()) - repairedRemoved.get(i).add(sstable); - else - unrepairedRemoved.get(i).add(sstable); - } - for (SSTableReader sstable : added) - { - int i = compactionStrategyIndexFor(sstable); - if (sstable.isRepaired()) - repairedAdded.get(i).add(sstable); - else - unrepairedAdded.get(i).add(sstable); - } - for (int i = 0; i < locationSize; i++) - { - if (!repairedRemoved.get(i).isEmpty()) - repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i)); - else - repaired.get(i).addSSTables(repairedAdded.get(i)); + for (int i = 0; i < locationSize; i++) + { + repairedRemoved.add(new HashSet<>()); + repairedAdded.add(new HashSet<>()); + unrepairedRemoved.add(new HashSet<>()); + unrepairedAdded.add(new HashSet<>()); + } - if (!unrepairedRemoved.get(i).isEmpty()) - unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i)); - else - unrepaired.get(i).addSSTables(unrepairedAdded.get(i)); - } + for (SSTableReader sstable : removed) + { + int i = compactionStrategyIndexFor(sstable); + if (sstable.isRepaired()) + repairedRemoved.get(i).add(sstable); + else + unrepairedRemoved.get(i).add(sstable); } - finally + for (SSTableReader sstable : added) { - readLock.unlock(); + int i = compactionStrategyIndexFor(sstable); + if (sstable.isRepaired()) + repairedAdded.get(i).add(sstable); + else + unrepairedAdded.get(i).add(sstable); + } + for (int i = 0; i < locationSize; i++) + { + if (!repairedRemoved.get(i).isEmpty()) + repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i)); + else + repaired.get(i).addSSTables(repairedAdded.get(i)); + + if (!unrepairedRemoved.get(i).isEmpty()) + unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i)); + else + unrepaired.get(i).addSSTables(unrepairedAdded.get(i)); } } private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - // we need a write lock here since we move sstables from one strategy instance to another - readLock.lock(); - try + for (SSTableReader sstable : sstables) { - for (SSTableReader sstable : sstables) + int index = compactionStrategyIndexFor(sstable); + if (sstable.isRepaired()) { - int index = compactionStrategyIndexFor(sstable); - if (sstable.isRepaired()) - { - unrepaired.get(index).removeSSTable(sstable); - repaired.get(index).addSSTable(sstable); - } - else - { - repaired.get(index).removeSSTable(sstable); - unrepaired.get(index).addSSTable(sstable); - } + unrepaired.get(index).removeSSTable(sstable); + repaired.get(index).addSSTable(sstable); + } + else + { + repaired.get(index).removeSSTable(sstable); + unrepaired.get(index).addSSTable(sstable); } } - finally - { - readLock.unlock(); - } + } private void handleDeletingNotification(SSTableReader deleted) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - readLock.lock(); - try - { - compactionStrategyFor(deleted).removeSSTable(deleted); - } - finally - { - readLock.unlock(); - } + compactionStrategyFor(deleted).removeSSTable(deleted); } public void handleNotification(INotification notification, Object sender) { - if (notification instanceof SSTableAddedNotification) - { - handleFlushNotification(((SSTableAddedNotification) notification).added); - } - else if (notification instanceof SSTableListChangedNotification) - { - SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; - handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed); - } - else if (notification instanceof SSTableRepairStatusChanged) + // we might race with reload adding/removing the sstables, this means that compaction strategies + // must handle double notifications. + maybeReloadDiskBoundaries(); + readLock.lock(); + try { - handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables); + if (notification instanceof SSTableAddedNotification) + { + handleFlushNotification(((SSTableAddedNotification) notification).added); + } + else if (notification instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed); + } + else if (notification instanceof SSTableRepairStatusChanged) + { + handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables); + } + else if (notification instanceof SSTableDeletingNotification) + { + handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); + } } - else if (notification instanceof SSTableDeletingNotification) + finally { - handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); + readLock.unlock(); } } @@ -750,15 +719,7 @@ public class CompactionStrategyManager implements INotificationConsumer public long getMaxSSTableBytes() { - readLock.lock(); - try - { - return unrepaired.get(0).getMaxSSTableBytes(); - } - finally - { - readLock.unlock(); - } + return maxSSTableSizeBytes; } public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) @@ -925,16 +886,7 @@ public class CompactionStrategyManager implements INotificationConsumer public String getName() { - maybeReloadDiskBoundaries(); - readLock.lock(); - try - { - return unrepaired.get(0).getName(); - } - finally - { - readLock.unlock(); - } + return name; } public List<List<AbstractCompactionStrategy>> getStrategies() diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 8c37bb4..77cb223 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -161,7 +161,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput) { - Iterable<SSTableReader> sstables = manifest.getAllSSTables(); + Iterable<SSTableReader> sstables = manifest.getSSTables(); Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); if (Iterables.isEmpty(sstables)) @@ -340,9 +340,15 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } @Override + public void addSSTables(Iterable<SSTableReader> sstables) + { + manifest.addSSTables(sstables); + } + + @Override public void addSSTable(SSTableReader added) { - manifest.add(added); + manifest.addSSTables(Collections.singleton(added)); } @Override @@ -493,21 +499,17 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy level: for (int i = manifest.getLevelCount(); i >= 0; i--) { + if (manifest.getLevelSize(i) == 0) + continue; // sort sstables by droppable ratio in descending order - SortedSet<SSTableReader> sstables = manifest.getLevelSorted(i, new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - double r1 = o1.getEstimatedDroppableTombstoneRatio(gcBefore); - double r2 = o2.getEstimatedDroppableTombstoneRatio(gcBefore); - return -1 * Doubles.compare(r1, r2); - } + List<SSTableReader> tombstoneSortedSSTables = manifest.getLevelSorted(i, (o1, o2) -> { + double r1 = o1.getEstimatedDroppableTombstoneRatio(gcBefore); + double r2 = o2.getEstimatedDroppableTombstoneRatio(gcBefore); + return -1 * Doubles.compare(r1, r2); }); - if (sstables.isEmpty()) - continue; Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); - for (SSTableReader sstable : sstables) + for (SSTableReader sstable : tombstoneSortedSSTables) { if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold) continue level; diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java new file mode 100644 index 0000000..f7087f0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.primitives.Ints; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Handles the leveled manifest generations + * + * Not thread safe, all access should be synchronized in LeveledManifest + */ +class LeveledGenerations +{ + private static final Logger logger = LoggerFactory.getLogger(LeveledGenerations.class); + private final boolean strictLCSChecksTest = Boolean.getBoolean(Config.PROPERTY_PREFIX + "test.strict_lcs_checks"); + // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is + // updated, we will still have sstables of the older, potentially smaller size. So don't make this + // dependent on maxSSTableSize.) + static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000); + + private final Set<SSTableReader> l0 = new HashSet<>(); + private static long lastOverlapCheck = System.nanoTime(); + // note that since l0 is broken out, levels[0] represents L1: + private final TreeSet<SSTableReader> [] levels = new TreeSet[MAX_LEVEL_COUNT - 1]; + + private static final Comparator<SSTableReader> nonL0Comparator = (o1, o2) -> { + int cmp = SSTableReader.sstableComparator.compare(o1, o2); + if (cmp == 0) + cmp = Ints.compare(o1.descriptor.generation, o2.descriptor.generation); + return cmp; + }; + + LeveledGenerations() + { + for (int i = 0; i < MAX_LEVEL_COUNT - 1; i++) + levels[i] = new TreeSet<>(nonL0Comparator); + } + + Set<SSTableReader> get(int level) + { + if (level > levelCount() - 1 || level < 0) + throw new ArrayIndexOutOfBoundsException("Invalid generation " + level + " - maximum is " + (levelCount() - 1)); + if (level == 0) + return l0; + return levels[level - 1]; + } + + int levelCount() + { + return levels.length + 1; + } + + /** + * Adds readers to the correct level + * + * If adding an sstable would cause an overlap in the level (if level > 1) we send it to L0. This can happen + * for example when moving sstables from unrepaired to repaired. + * + * If the sstable is already in the manifest we skip it. + * + * If the sstable exists in the manifest but has the wrong level, it is removed from the wrong level and added to the correct one + * + * todo: group sstables per level, add all if level is currently empty, improve startup speed + */ + void addAll(Iterable<SSTableReader> readers) + { + logDistribution(); + for (SSTableReader sstable : readers) + { + assert sstable.getSSTableLevel() < levelCount() : "Invalid level " + sstable.getSSTableLevel() + " out of " + (levelCount() - 1); + int existingLevel = getLevelIfExists(sstable); + if (existingLevel != -1) + { + if (sstable.getSSTableLevel() != existingLevel) + { + logger.error("SSTable {} on the wrong level in the manifest - {} instead of {} as recorded in the sstable metadata, removing from level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel); + if (strictLCSChecksTest) + throw new AssertionError("SSTable not in matching level in manifest: "+sstable + ": "+existingLevel+" != " + sstable.getSSTableLevel()); + get(existingLevel).remove(sstable); + } + else + { + logger.info("Manifest already contains {} in level {} - skipping", sstable, existingLevel); + continue; + } + } + + if (sstable.getSSTableLevel() == 0) + { + l0.add(sstable); + continue; + } + + TreeSet<SSTableReader> level = levels[sstable.getSSTableLevel() - 1]; + /* + current level: |-----||----||----| |---||---| + new sstable: |--| + ^ before + ^ after + overlap if before.last >= newsstable.first or after.first <= newsstable.last + */ + SSTableReader after = level.ceiling(sstable); + SSTableReader before = level.floor(sstable); + + if (before != null && before.last.compareTo(sstable.first) >= 0 || + after != null && after.first.compareTo(sstable.last) <= 0) + { + if (strictLCSChecksTest) // we can only assert this in tests since this is normal when for example moving sstables from unrepaired to repaired + throw new AssertionError("Got unexpected overlap in level "+sstable.getSSTableLevel()); + sendToL0(sstable); + } + else + { + level.add(sstable); + } + } + maybeVerifyLevels(); + } + + /** + * Sends sstable to L0 by mutating its level in the sstable metadata. + * + * SSTable should not exist in the manifest + */ + private void sendToL0(SSTableReader sstable) + { + try + { + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0); + sstable.reloadSSTableMetadata(); + } + catch (IOException e) + { + // Adding it to L0 and marking suspect is probably the best we can do here - it won't create overlap + // and we won't pick it for later compactions. + logger.error("Failed mutating sstable metadata for {} - adding it to L0 to avoid overlap. Marking suspect", sstable, e); + sstable.markSuspect(); + } + l0.add(sstable); + } + + /** + * Tries to find the sstable in the levels without using the sstable-recorded level + * + * Used to make sure we don't try to re-add an existing sstable + */ + private int getLevelIfExists(SSTableReader sstable) + { + for (int i = 0; i < levelCount(); i++) + { + if (get(i).contains(sstable)) + return i; + } + return -1; + } + + int remove(Collection<SSTableReader> readers) + { + int minLevel = Integer.MAX_VALUE; + for (SSTableReader sstable : readers) + { + int level = sstable.getSSTableLevel(); + minLevel = Math.min(minLevel, level); + get(level).remove(sstable); + } + return minLevel; + } + + int[] getAllLevelSize() + { + int[] counts = new int[levelCount()]; + for (int i = 0; i < levelCount(); i++) + counts[i] = get(i).size(); + return counts; + } + + Set<SSTableReader> allSSTables() + { + ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder(); + builder.addAll(l0); + for (Set<SSTableReader> sstables : levels) + builder.addAll(sstables); + return builder.build(); + } + + /** + * given a level with sstables with first tokens [0, 10, 20, 30] and a lastCompactedSSTable with last = 15, we will + * return an Iterator over [20, 30, 0, 10]. + */ + Iterator<SSTableReader> wrappingIterator(int lvl, SSTableReader lastCompactedSSTable) + { + assert lvl > 0; // only makes sense in L1+ + TreeSet<SSTableReader> level = levels[lvl - 1]; + if (level.isEmpty()) + return Collections.emptyIterator(); + if (lastCompactedSSTable == null) + return level.iterator(); + + PeekingIterator<SSTableReader> tail = Iterators.peekingIterator(level.tailSet(lastCompactedSSTable).iterator()); + SSTableReader pivot = null; + // then we need to make sure that the first token of the pivot is greater than the last token of the lastCompactedSSTable + while (tail.hasNext()) + { + SSTableReader potentialPivot = tail.peek(); + if (potentialPivot.first.compareTo(lastCompactedSSTable.last) > 0) + { + pivot = potentialPivot; + break; + } + tail.next(); + } + + if (pivot == null) + return level.iterator(); + + return Iterators.concat(tail, level.headSet(pivot, false).iterator()); + } + + void logDistribution() + { + if (logger.isTraceEnabled()) + { + for (int i = 0; i < levelCount(); i++) + { + Set<SSTableReader> level = get(i); + if (!level.isEmpty()) + { + logger.trace("L{} contains {} SSTables ({}) in {}", + i, + level.size(), + FBUtilities.prettyPrintMemory(SSTableReader.getTotalBytes(level)), + this); + } + } + } + } + + Set<SSTableReader>[] snapshot() + { + Set<SSTableReader> [] levelsCopy = new Set[levelCount()]; + for (int i = 0; i < levelCount(); i++) + levelsCopy[i] = ImmutableSet.copyOf(get(i)); + return levelsCopy; + } + + /** + * do extra verification of the sstables in the generations + * + * only used during tests + */ + private void maybeVerifyLevels() + { + if (!strictLCSChecksTest || System.nanoTime() - lastOverlapCheck <= TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS)) + return; + logger.info("LCS verifying levels"); + lastOverlapCheck = System.nanoTime(); + for (int i = 1; i < levelCount(); i++) + { + SSTableReader prev = null; + for (SSTableReader sstable : get(i)) + { + // no overlap: + assert prev == null || prev.last.compareTo(sstable.first) < 0; + prev = sstable; + // make sure it does not exist in any other level: + for (int j = 0; j < levelCount(); j++) + { + if (i == j) + continue; + assert !get(j).contains(sstable); + } + } + } + } +} diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 8a8362f..d630730 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -17,19 +17,19 @@ */ package org.apache.cassandra.db.compaction; -import java.io.IOException; import java.util.*; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +40,10 @@ import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.db.compaction.LeveledGenerations.MAX_LEVEL_COUNT; + public class LeveledManifest { private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class); @@ -59,14 +60,12 @@ public class LeveledManifest * that level into lower level compactions */ private static final int NO_COMPACTION_LIMIT = 25; - // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is - // updated, we will still have sstables of the older, potentially smaller size. So don't make this - // dependent on maxSSTableSize.) - public static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000); + private final ColumnFamilyStore cfs; - @VisibleForTesting - protected final List<SSTableReader>[] generations; - private final PartitionPosition[] lastCompactedKeys; + + private final LeveledGenerations generations; + + private final SSTableReader[] lastCompactedSSTables; private final long maxSSTableSizeInBytes; private final SizeTieredCompactionStrategyOptions options; private final int [] compactionCounter; @@ -79,13 +78,8 @@ public class LeveledManifest this.options = options; this.levelFanoutSize = fanoutSize; - generations = new List[MAX_LEVEL_COUNT]; - lastCompactedKeys = new PartitionPosition[MAX_LEVEL_COUNT]; - for (int i = 0; i < generations.length; i++) - { - generations[i] = new ArrayList<>(); - lastCompactedKeys[i] = cfs.getPartitioner().getMinimumToken().minKeyBound(); - } + lastCompactedSSTables = new SSTableReader[MAX_LEVEL_COUNT]; + generations = new LeveledGenerations(); compactionCounter = new int[MAX_LEVEL_COUNT]; } @@ -99,14 +93,7 @@ public class LeveledManifest LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize, fanoutSize, options); // ensure all SSTables are in the manifest - for (SSTableReader ssTableReader : sstables) - { - manifest.add(ssTableReader); - } - for (int i = 1; i < manifest.getAllLevelSize().length; i++) - { - manifest.repairOverlappingSSTables(i); - } + manifest.addSSTables(sstables); manifest.calculateLastCompactedKeys(); return manifest; } @@ -115,17 +102,18 @@ public class LeveledManifest * If we want to start compaction in level n, find the newest (by modification time) file in level n+1 * and use its last token for last compacted key in level n; */ - public void calculateLastCompactedKeys() + void calculateLastCompactedKeys() { - for (int i = 0; i < generations.length - 1; i++) + for (int i = 0; i < generations.levelCount() - 1; i++) { + Set<SSTableReader> level = generations.get(i + 1); // this level is empty - if (generations[i + 1].isEmpty()) + if (level.isEmpty()) continue; SSTableReader sstableWithMaxModificationTime = null; long maxModificationTime = Long.MIN_VALUE; - for (SSTableReader ssTableReader : generations[i + 1]) + for (SSTableReader ssTableReader : level) { long modificationTime = ssTableReader.getCreationTimeFor(Component.DATA); if (modificationTime >= maxModificationTime) @@ -135,80 +123,27 @@ public class LeveledManifest } } - lastCompactedKeys[i] = sstableWithMaxModificationTime.last; + lastCompactedSSTables[i] = sstableWithMaxModificationTime; } } - public synchronized void add(SSTableReader reader) + public synchronized void addSSTables(Iterable<SSTableReader> readers) { - int level = reader.getSSTableLevel(); - - assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1); - logDistribution(); - if (canAddSSTable(reader)) - { - // adding the sstable does not cause overlap in the level - logger.trace("Adding {} to L{}", reader, level); - generations[level].add(reader); - } - else - { - // this can happen if: - // * a compaction has promoted an overlapping sstable to the given level, or - // was also supposed to add an sstable at the given level. - // * we are moving sstables from unrepaired to repaired and the sstable - // would cause overlap - // - // The add(..):ed sstable will be sent to level 0 - try - { - reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0); - reader.reloadSSTableMetadata(); - } - catch (IOException e) - { - logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e); - } - if (!contains(reader)) - { - generations[0].add(reader); - } - else - { - // An SSTable being added multiple times to this manifest indicates a programming error, but we don't - // throw an AssertionError because this shouldn't break the compaction strategy. Instead we log it - // together with a RuntimeException so the stack is print for troubleshooting if this ever happens. - logger.warn("SSTable {} is already present on leveled manifest and should not be re-added.", reader, new RuntimeException()); - } - } - } - - private boolean contains(SSTableReader reader) - { - for (int i = 0; i < generations.length; i++) - { - if (generations[i].contains(reader)) - return true; - } - return false; + generations.addAll(readers); } public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added) { assert !removed.isEmpty(); // use add() instead of promote when adding new sstables - logDistribution(); if (logger.isTraceEnabled()) + { + generations.logDistribution(); logger.trace("Replacing [{}]", toString(removed)); + } // the level for the added sstables is the max of the removed ones, // plus one if the removed were all on the same level - int minLevel = Integer.MAX_VALUE; - - for (SSTableReader sstable : removed) - { - int thisLevel = remove(sstable); - minLevel = Math.min(minLevel, thisLevel); - } + int minLevel = generations.remove(removed); // it's valid to do a remove w/o an add (e.g. on truncate) if (added.isEmpty()) @@ -217,76 +152,8 @@ public class LeveledManifest if (logger.isTraceEnabled()) logger.trace("Adding [{}]", toString(added)); - for (SSTableReader ssTableReader : added) - add(ssTableReader); - lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last; - } - - public synchronized void repairOverlappingSSTables(int level) - { - SSTableReader previous = null; - Collections.sort(generations[level], SSTableReader.sstableComparator); - List<SSTableReader> outOfOrderSSTables = new ArrayList<>(); - for (SSTableReader current : generations[level]) - { - if (previous != null && current.first.compareTo(previous.last) <= 0) - { - logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact that you have dropped sstables from another node into the data directory. " + - "Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable", - level, previous, previous.first, previous.last, current, current.first, current.last); - outOfOrderSSTables.add(current); - } - else - { - previous = current; - } - } - - if (!outOfOrderSSTables.isEmpty()) - { - for (SSTableReader sstable : outOfOrderSSTables) - sendBackToL0(sstable); - } - } - - /** - * Checks if adding the sstable creates an overlap in the level - * @param sstable the sstable to add - * @return true if it is safe to add the sstable in the level. - */ - private boolean canAddSSTable(SSTableReader sstable) - { - int level = sstable.getSSTableLevel(); - if (level == 0) - return true; - - List<SSTableReader> copyLevel = new ArrayList<>(generations[level]); - copyLevel.add(sstable); - Collections.sort(copyLevel, SSTableReader.sstableComparator); - - SSTableReader previous = null; - for (SSTableReader current : copyLevel) - { - if (previous != null && current.first.compareTo(previous.last) <= 0) - return false; - previous = current; - } - return true; - } - - private synchronized void sendBackToL0(SSTableReader sstable) - { - remove(sstable); - try - { - sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0); - sstable.reloadSSTableMetadata(); - add(sstable); - } - catch (IOException e) - { - throw new RuntimeException("Could not reload sstable meta data", e); - } + generations.addAll(added); + lastCompactedSSTables[minLevel] = SSTableReader.sstableOrdering.max(added); } private String toString(Collection<SSTableReader> sstables) @@ -329,7 +196,7 @@ public class LeveledManifest // the streamed files can be placed in their original levels if (StorageService.instance.isBootstrapMode()) { - List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0)); + List<SSTableReader> mostInteresting = getSSTablesForSTCS(generations.get(0)); if (!mostInteresting.isEmpty()) { logger.info("Bootstrapping - doing STCS in L0"); @@ -364,9 +231,9 @@ public class LeveledManifest // This isn't a magic wand -- if you are consistently writing too fast for LCS to keep // up, you're still screwed. But if instead you have intermittent bursts of activity, // it can help a lot. - for (int i = generations.length - 1; i > 0; i--) + for (int i = generations.levelCount() - 1; i > 0; i--) { - List<SSTableReader> sstables = getLevel(i); + Set<SSTableReader> sstables = generations.get(i); if (sstables.isEmpty()) continue; // mostly this just avoids polluting the debug log with zero scores // we want to calculate score excluding compacting ones @@ -400,7 +267,7 @@ public class LeveledManifest } // Higher levels are happy, time for a standard, non-STCS L0 compaction - if (getLevel(0).isEmpty()) + if (generations.get(0).isEmpty()) return null; Collection<SSTableReader> candidates = getCandidatesFor(0); if (candidates.isEmpty()) @@ -415,9 +282,9 @@ public class LeveledManifest private CompactionCandidate getSTCSInL0CompactionCandidate() { - if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0) + if (!DatabaseDescriptor.getDisableSTCSInL0() && generations.get(0).size() > MAX_COMPACTING_L0) { - List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0)); + List<SSTableReader> mostInteresting = getSSTablesForSTCS(generations.get(0)); if (!mostInteresting.isEmpty()) { logger.debug("L0 is too far behind, performing size-tiering there first"); @@ -454,7 +321,7 @@ public class LeveledManifest { Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates); - for (int i = generations.length - 1; i > 0; i--) + for (int i = generations.levelCount() - 1; i > 0; i--) compactionCounter[i]++; compactionCounter[targetLevel] = 0; if (logger.isTraceEnabled()) @@ -463,7 +330,7 @@ public class LeveledManifest logger.trace("CompactionCounter: {}: {}", j, compactionCounter[j]); } - for (int i = generations.length - 1; i > 0; i--) + for (int i = generations.levelCount() - 1; i > 0; i--) { if (getLevelSize(i) > 0) { @@ -486,9 +353,9 @@ public class LeveledManifest return candidates; Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); Range<PartitionPosition> boundaries = new Range<>(min, max); - for (SSTableReader sstable : getLevel(i)) + for (SSTableReader sstable : generations.get(i)) { - Range<PartitionPosition> r = new Range<PartitionPosition>(sstable.first, sstable.last); + Range<PartitionPosition> r = new Range<>(sstable.first, sstable.last); if (boundaries.contains(r) && !compacting.contains(sstable)) { logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable); @@ -506,35 +373,12 @@ public class LeveledManifest public synchronized int getLevelSize(int i) { - if (i >= generations.length) - throw new ArrayIndexOutOfBoundsException("Maximum valid generation is " + (generations.length - 1)); - return getLevel(i).size(); + return generations.get(i).size(); } public synchronized int[] getAllLevelSize() { - int[] counts = new int[generations.length]; - for (int i = 0; i < counts.length; i++) - counts[i] = getLevel(i).size(); - return counts; - } - - private void logDistribution() - { - if (logger.isTraceEnabled()) - { - for (int i = 0; i < generations.length; i++) - { - if (!getLevel(i).isEmpty()) - { - logger.trace("L{} contains {} SSTables ({}) in {}", - i, - getLevel(i).size(), - FBUtilities.prettyPrintMemory(SSTableReader.getTotalBytes(getLevel(i))), - this); - } - } - } + return generations.getAllLevelSize(); } @VisibleForTesting @@ -542,10 +386,15 @@ public class LeveledManifest { int level = reader.getSSTableLevel(); assert level >= 0 : reader + " not present in manifest: "+level; - generations[level].remove(reader); + generations.remove(Collections.singleton(reader)); return level; } + public synchronized Set<SSTableReader> getSSTables() + { + return generations.allSSTables(); + } + private static Set<SSTableReader> overlapping(Collection<SSTableReader> candidates, Iterable<SSTableReader> others) { assert !candidates.isEmpty(); @@ -591,7 +440,7 @@ public class LeveledManifest { assert start.compareTo(end) <= 0; Set<SSTableReader> overlapped = new HashSet<>(); - Bounds<Token> promotedBounds = new Bounds<Token>(start, end); + Bounds<Token> promotedBounds = new Bounds<>(start, end); for (Map.Entry<SSTableReader, Bounds<Token>> pair : sstables.entrySet()) { @@ -601,20 +450,12 @@ public class LeveledManifest return overlapped; } - private static final Predicate<SSTableReader> suspectP = new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader candidate) - { - return candidate.isMarkedSuspect(); - } - }; - private static Map<SSTableReader, Bounds<Token>> genBounds(Iterable<SSTableReader> ssTableReaders) { Map<SSTableReader, Bounds<Token>> boundsMap = new HashMap<>(); for (SSTableReader sstable : ssTableReaders) { - boundsMap.put(sstable, new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())); + boundsMap.put(sstable, new Bounds<>(sstable.first.getToken(), sstable.last.getToken())); } return boundsMap; } @@ -626,14 +467,14 @@ public class LeveledManifest */ private Collection<SSTableReader> getCandidatesFor(int level) { - assert !getLevel(level).isEmpty(); + assert !generations.get(level).isEmpty(); logger.trace("Choosing candidates for L{}", level); final Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); if (level == 0) { - Set<SSTableReader> compactingL0 = getCompacting(0); + Set<SSTableReader> compactingL0 = getCompactingL0(); PartitionPosition lastCompactingKey = null; PartitionPosition firstCompactingKey = null; @@ -659,7 +500,7 @@ public class LeveledManifest // basically screwed, since we expect all or most L0 sstables to overlap with each L1 sstable. // So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best. Set<SSTableReader> candidates = new HashSet<>(); - Map<SSTableReader, Bounds<Token>> remaining = genBounds(Iterables.filter(getLevel(0), Predicates.not(suspectP))); + Map<SSTableReader, Bounds<Token>> remaining = genBounds(Iterables.filter(generations.get(0), Predicates.not(SSTableReader::isMarkedSuspect))); for (SSTableReader sstable : ageSortedSSTables(remaining.keySet())) { @@ -672,7 +513,7 @@ public class LeveledManifest for (SSTableReader newCandidate : overlappedL0) { - if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0) + if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Collections.singleton(newCandidate)).size() == 0) candidates.add(newCandidate); remaining.remove(newCandidate); } @@ -691,7 +532,7 @@ public class LeveledManifest // add sstables from L1 that overlap candidates // if the overlapping ones are already busy in a compaction, leave it out. // TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables - Set<SSTableReader> l1overlapping = overlapping(candidates, getLevel(1)); + Set<SSTableReader> l1overlapping = overlapping(candidates, generations.get(1)); if (Sets.intersection(l1overlapping, compacting).size() > 0) return Collections.emptyList(); if (!overlapping(candidates, compactingL0).isEmpty()) @@ -704,27 +545,16 @@ public class LeveledManifest return candidates; } - // for non-L0 compactions, pick up where we left off last time - Collections.sort(getLevel(level), SSTableReader.sstableComparator); - int start = 0; // handles case where the prior compaction touched the very last range - for (int i = 0; i < getLevel(level).size(); i++) - { - SSTableReader sstable = getLevel(level).get(i); - if (sstable.first.compareTo(lastCompactedKeys[level]) > 0) - { - start = i; - break; - } - } - // look for a non-suspect keyspace to compact with, starting with where we left off last time, // and wrapping back to the beginning of the generation if necessary - Map<SSTableReader, Bounds<Token>> sstablesNextLevel = genBounds(getLevel(level + 1)); - for (int i = 0; i < getLevel(level).size(); i++) + Map<SSTableReader, Bounds<Token>> sstablesNextLevel = genBounds(generations.get(level + 1)); + Iterator<SSTableReader> levelIterator = generations.wrappingIterator(level, lastCompactedSSTables[level]); + while (levelIterator.hasNext()) { - SSTableReader sstable = getLevel(level).get((start + i) % getLevel(level).size()); + SSTableReader sstable = levelIterator.next(); Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlappingWithBounds(sstable, sstablesNextLevel)); - if (Iterables.any(candidates, suspectP)) + + if (Iterables.any(candidates, SSTableReader::isMarkedSuspect)) continue; if (Sets.intersection(candidates, compacting).isEmpty()) return candidates; @@ -734,10 +564,10 @@ public class LeveledManifest return Collections.emptyList(); } - private Set<SSTableReader> getCompacting(int level) + private Set<SSTableReader> getCompactingL0() { Set<SSTableReader> sstables = new HashSet<>(); - Set<SSTableReader> levelSSTables = new HashSet<>(getLevel(level)); + Set<SSTableReader> levelSSTables = new HashSet<>(generations.get(0)); for (SSTableReader sstable : cfs.getTracker().getCompacting()) { if (levelSSTables.contains(sstable)) @@ -749,19 +579,14 @@ public class LeveledManifest @VisibleForTesting List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates) { - List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates); - Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampAscending); - return ageSortedCandidates; + List<SSTableReader> copy = new ArrayList<>(candidates); + copy.sort(SSTableReader.maxTimestampAscending); + return ImmutableList.copyOf(copy); } public synchronized Set<SSTableReader>[] getSStablesPerLevelSnapshot() { - Set<SSTableReader>[] sstablesPerLevel = new Set[generations.length]; - for (int i = 0; i < generations.length; i++) - { - sstablesPerLevel[i] = new HashSet<>(generations[i]); - } - return sstablesPerLevel; + return generations.snapshot(); } @Override @@ -770,34 +595,24 @@ public class LeveledManifest return "Manifest@" + hashCode(); } - public int getLevelCount() + public synchronized int getLevelCount() { - for (int i = generations.length - 1; i >= 0; i--) + for (int i = generations.levelCount() - 1; i >= 0; i--) { - if (getLevel(i).size() > 0) + if (generations.get(i).size() > 0) return i; } return 0; } - public synchronized SortedSet<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator) - { - return ImmutableSortedSet.copyOf(comparator, getLevel(level)); - } - - public List<SSTableReader> getLevel(int i) - { - return generations[i]; - } - public synchronized int getEstimatedTasks() { long tasks = 0; - long[] estimated = new long[generations.length]; + long[] estimated = new long[generations.levelCount()]; - for (int i = generations.length - 1; i >= 0; i--) + for (int i = generations.levelCount() - 1; i >= 0; i--) { - List<SSTableReader> sstables = getLevel(i); + Set<SSTableReader> sstables = generations.get(i); // If there is 1 byte over TBL - (MBL * 1.001), there is still a task left, so we need to round up. estimated[i] = (long)Math.ceil((double)Math.max(0L, SSTableReader.getTotalBytes(sstables) - (long)(maxBytesForLevel(i, maxSSTableSizeInBytes) * 1.001)) / (double)maxSSTableSizeInBytes); tasks += estimated[i]; @@ -829,17 +644,18 @@ public class LeveledManifest assert newLevel > 0; } return newLevel; + } + synchronized Set<SSTableReader> getLevel(int level) + { + return ImmutableSet.copyOf(generations.get(level)); } - public Iterable<SSTableReader> getAllSSTables() + synchronized List<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator) { - Set<SSTableReader> sstables = new HashSet<>(); - for (List<SSTableReader> generation : generations) - { - sstables.addAll(generation); - } - return sstables; + List<SSTableReader> copy = new ArrayList<>(generations.get(level)); + copy.sort(comparator); + return ImmutableList.copyOf(copy); } public static class CompactionCandidate diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 2643438..2a4f293 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -245,30 +245,19 @@ public class StandaloneScrubber if (strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class)) { int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L)); + int fanOut = cfs.getCompactionStrategyManager().getLevelFanoutSize(); + List<SSTableReader> repaired = new ArrayList<>(); + List<SSTableReader> unrepaired = new ArrayList<>(); - System.out.println("Checking leveled manifest"); - Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>() + for (SSTableReader sstable : sstables) { - @Override - public boolean apply(SSTableReader sstable) - { - return sstable.isRepaired(); - } - }; - - List<SSTableReader> repaired = Lists.newArrayList(Iterables.filter(sstables, repairedPredicate)); - List<SSTableReader> unRepaired = Lists.newArrayList(Iterables.filter(sstables, Predicates.not(repairedPredicate))); - - LeveledManifest repairedManifest = LeveledManifest.create(cfs, maxSizeInMB, cfs.getLevelFanoutSize(), repaired); - for (int i = 1; i < repairedManifest.getLevelCount(); i++) - { - repairedManifest.repairOverlappingSSTables(i); - } - LeveledManifest unRepairedManifest = LeveledManifest.create(cfs, maxSizeInMB, cfs.getLevelFanoutSize(), unRepaired); - for (int i = 1; i < unRepairedManifest.getLevelCount(); i++) - { - unRepairedManifest.repairOverlappingSSTables(i); + if (sstable.isRepaired()) + repaired.add(sstable); + else + unrepaired.add(sstable); } + LeveledManifest.create(cfs, maxSizeInMB, fanOut, repaired); + LeveledManifest.create(cfs, maxSizeInMB, fanOut, unrepaired); } } diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyCQLTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyCQLTest.java new file mode 100644 index 0000000..9bfa380 --- /dev/null +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyCQLTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Hex; + +public class LongLeveledCompactionStrategyCQLTest extends CQLTester +{ + + @Test + public void stressTestCompactionStrategyManager() throws ExecutionException, InterruptedException + { + System.setProperty(Config.PROPERTY_PREFIX + "test.strict_lcs_checks", "true"); + // flush/compact tons of sstables, invalidate token metadata in a loop to make CSM reload the strategies + createTable("create table %s (id int primary key, i text) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1}"); + ExecutorService es = Executors.newSingleThreadExecutor(); + DatabaseDescriptor.setConcurrentCompactors(8); + AtomicBoolean stop = new AtomicBoolean(false); + long start = System.currentTimeMillis(); + try + { + Random r = new Random(); + Future<?> writes = es.submit(() -> { + + byte[] b = new byte[1024]; + while (!stop.get()) + { + + for (int i = 0 ; i < 100; i++) + { + try + { + r.nextBytes(b); + String s = Hex.bytesToHex(b); + execute("insert into %s (id, i) values (?,?)", r.nextInt(), s); + } + catch (Throwable throwable) + { + throw new RuntimeException(throwable); + } + } + getCurrentColumnFamilyStore().forceBlockingFlush(); + Uninterruptibles.sleepUninterruptibly(r.nextInt(200), TimeUnit.MILLISECONDS); + } + }); + + while(System.currentTimeMillis() - start < TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES)) + { + StorageService.instance.getTokenMetadata().invalidateCachedRings(); + Uninterruptibles.sleepUninterruptibly(r.nextInt(1000), TimeUnit.MILLISECONDS); + } + + stop.set(true); + writes.get(); + } + finally + { + es.shutdown(); + } + } +} diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index 0d54173..ba3ad44 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -128,7 +128,7 @@ public class LongLeveledCompactionStrategyTest int levels = manifest.getLevelCount(); for (int level = 0; level < levels; level++) { - List<SSTableReader> sstables = manifest.getLevel(level); + Set<SSTableReader> sstables = manifest.getLevel(level); // score check assert (double) SSTableReader.getTotalBytes(sstables) / manifest.maxBytesForLevel(level, 1 * 1024 * 1024) < 1.00; // overlap check for levels greater than 0 diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index 804bccb..8208000 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -48,6 +48,8 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; + public class MockSchema { static @@ -84,6 +86,21 @@ public class MockSchema public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) { + return sstable(generation, size, keepRef, generation, generation, cfs); + } + + public static SSTableReader sstableWithLevel(int generation, long firstToken, long lastToken, int level, ColumnFamilyStore cfs) + { + return sstable(generation, 0, false, firstToken, lastToken, level, cfs); + } + + public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs) + { + return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs); + } + + public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs) + { Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), @@ -117,12 +134,14 @@ public class MockSchema } SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) - .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header) + .sstableLevel(level) + .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header) .get(MetadataType.STATS); SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(), new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); - reader.first = reader.last = readerBounds(generation); + reader.first = readerBounds(firstToken); + reader.last = readerBounds(lastToken); if (!keepRef) reader.selfRef().release(); return reader; @@ -152,9 +171,9 @@ public class MockSchema return metadata; } - public static BufferDecoratedKey readerBounds(int generation) + public static BufferDecoratedKey readerBounds(long token) { - return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER); + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(token), ByteBufferUtil.EMPTY_BYTE_BUFFER); } private static File temp(String id) diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index b1d467e..1a3ac44 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -22,10 +22,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; import junit.framework.Assert; import org.junit.After; @@ -37,6 +41,7 @@ import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.MockSchema; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -157,7 +162,6 @@ public class LeveledCompactionStrategyTest assert groupLevel == tableLevel; } } - } /* @@ -296,7 +300,7 @@ public class LeveledCompactionStrategyTest strategy.manifest.remove(s); s.descriptor.getMetadataSerializer().mutateLevel(s.descriptor, 6); s.reloadSSTableMetadata(); - strategy.manifest.add(s); + strategy.manifest.addSSTables(Collections.singleton(s)); } // verify that all sstables in the changed set is level 6 for (SSTableReader s : cfs.getLiveSSTables()) @@ -345,14 +349,12 @@ public class LeveledCompactionStrategyTest for (SSTableReader sstable : cfs.getLiveSSTables()) assertFalse(sstable.isRepaired()); - int sstableCount = 0; - for (List<SSTableReader> level : unrepaired.manifest.generations) - sstableCount += level.size(); + int sstableCount = unrepaired.manifest.getSSTables().size(); // we only have unrepaired sstables: assertEquals(sstableCount, cfs.getLiveSSTables().size()); - SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0); - SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0); + SSTableReader sstable1 = unrepaired.manifest.getLevel(2).iterator().next(); + SSTableReader sstable2 = unrepaired.manifest.getLevel(1).iterator().next(); sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor, System.currentTimeMillis()); sstable1.reloadSSTableMetadata(); @@ -360,14 +362,12 @@ public class LeveledCompactionStrategyTest manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this); - int repairedSSTableCount = 0; - for (List<SSTableReader> level : repaired.manifest.generations) - repairedSSTableCount += level.size(); + int repairedSSTableCount = repaired.manifest.getSSTables().size(); assertEquals(1, repairedSSTableCount); // make sure the repaired sstable ends up in the same level in the repaired manifest: - assertTrue(repaired.manifest.generations[2].contains(sstable1)); + assertTrue(repaired.manifest.getLevel(2).contains(sstable1)); // and that it is gone from unrepaired - assertFalse(unrepaired.manifest.generations[2].contains(sstable1)); + assertFalse(unrepaired.manifest.getLevel(2).contains(sstable1)); unrepaired.removeSSTable(sstable2); manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this); @@ -480,4 +480,218 @@ public class LeveledCompactionStrategyTest lastMaxTimeStamp = sstable.getMaxTimestamp(); } } + + @Test + public void testAddingOverlapping() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> currentLevel = new ArrayList<>(); + int gen = 1; + currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs)); + + lm.addSSTables(currentLevel); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5)); + + List<SSTableReader> newSSTables = new ArrayList<>(); + // this sstable last token is the same as the first token of L1 above, should get sent to L0: + newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs)); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs)); + lm.addSSTables(newSSTables); + List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 3)); + newL1.add(newSSTables.get(1)); + assertLevelsEqual(lm.getLevel(1), newL1); + newSSTables.remove(1); + assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 0)); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + } + + @Test + public void singleTokenSSTableTest() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> expectedL1 = new ArrayList<>(); + + int gen = 1; + // single sstable, single token (100) + expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs)); + lm.addSSTables(expectedL1); + + List<SSTableReader> expectedL0 = new ArrayList<>(); + + // should get moved to L0: + expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs)); + lm.addSSTables(expectedL0); + + assertLevelsEqual(expectedL0, lm.getLevel(0)); + assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 0)); + assertLevelsEqual(expectedL1, lm.getLevel(1)); + assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 1)); + + // should work: + expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs)); + expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs)); + lm.addSSTables(expectedL1.subList(1, expectedL1.size())); + assertLevelsEqual(expectedL1, lm.getLevel(1)); + } + + @Test + public void randomMultiLevelAddTest() + { + int iterations = 100; + int levelCount = 8; + + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + long seed = System.currentTimeMillis(); + Random r = new Random(seed); + List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, levelCount, 0, r); + + int sstableCount = newLevels.size(); + lm.addSSTables(newLevels); + + int [] expectedLevelSizes = lm.getAllLevelSize(); + + for (int j = 0; j < iterations; j++) + { + newLevels = generateNewRandomLevels(cfs, 20, levelCount, sstableCount, r); + sstableCount += newLevels.size(); + + int[] canAdd = canAdd(lm, newLevels, levelCount); + for (int i = 0; i < levelCount; i++) + expectedLevelSizes[i] += canAdd[i]; + lm.addSSTables(newLevels); + } + + // and verify no levels overlap + int actualSSTableCount = 0; + for (int i = 0; i < levelCount; i++) + { + actualSSTableCount += lm.getLevelSize(i); + List<SSTableReader> level = new ArrayList<>(lm.getLevel(i)); + int lvl = i; + assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == lvl)); + if (i > 0) + { + level.sort(SSTableReader.sstableComparator); + SSTableReader prev = null; + for (SSTableReader sstable : level) + { + if (prev != null && sstable.first.compareTo(prev.last) <= 0) + { + String levelStr = level.stream().map(s -> String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", ")); + String overlap = String.format("sstable [%s, %s] overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, prev.first, prev.last, i, levelStr); + Assert.fail("[seed = "+seed+"] overlap in level "+lvl+": " + overlap); + } + prev = sstable; + } + } + } + assertEquals(sstableCount, actualSSTableCount); + for (int i = 0; i < levelCount; i++) + assertEquals("[seed = " + seed + "] wrong sstable count in level = " + i, expectedLevelSizes[i], lm.getLevel(i).size()); + } + + private static List<SSTableReader> generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int levelCount, int startGen, Random r) + { + List<SSTableReader> newLevels = new ArrayList<>(); + for (int level = 0; level < levelCount; level++) + { + int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1; + List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2); + + for (int i = 0; i < numLevelSSTables * 2; i++) + tokens.add(r.nextInt(4000)); + Collections.sort(tokens); + for (int i = 0; i < tokens.size() - 1; i += 2) + { + SSTableReader sstable = MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), level, cfs); + newLevels.add(sstable); + } + } + return newLevels; + } + + /** + * brute-force checks if the new sstables can be added to the correct level in manifest + * + * @return count of expected sstables to add to each level + */ + private static int[] canAdd(LeveledManifest lm, List<SSTableReader> newSSTables, int levelCount) + { + Map<Integer, Collection<SSTableReader>> sstableGroups = new HashMap<>(); + newSSTables.forEach(s -> sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new ArrayList<>()).add(s)); + + int[] canAdd = new int[levelCount]; + for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : sstableGroups.entrySet()) + { + int level = lvlGroup.getKey(); + if (level == 0) + { + canAdd[0] += lvlGroup.getValue().size(); + continue; + } + + List<SSTableReader> newLevel = new ArrayList<>(lm.getLevel(level)); + for (SSTableReader sstable : lvlGroup.getValue()) + { + newLevel.add(sstable); + newLevel.sort(SSTableReader.sstableComparator); + + SSTableReader prev = null; + boolean kept = true; + for (SSTableReader sst : newLevel) + { + if (prev != null && prev.last.compareTo(sst.first) >= 0) + { + newLevel.remove(sstable); + kept = false; + break; + } + prev = sst; + } + if (kept) + canAdd[level] += 1; + else + canAdd[0] += 1; + } + } + return canAdd; + } + + private static void assertLevelsEqual(Collection<SSTableReader> l1, Collection<SSTableReader> l2) + { + assertEquals(l1.size(), l2.size()); + assertEquals(new HashSet<>(l1), new HashSet<>(l2)); + } } diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java new file mode 100644 index 0000000..1f17bf8 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static junit.framework.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class LeveledGenerationsTest extends CQLTester +{ + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.daemonInitialization(); + MockSchema.cleanup(); + } + + @Test + public void testWrappingIterable() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + + LeveledGenerations gens = new LeveledGenerations(); + + for (int i = 0; i < 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i, 5, true, i, i, 2, cfs); + gens.addAll(Collections.singleton(sstable)); + } + int gen = 10; + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 5)), + 6, 5, 10); + assertIter(gens.wrappingIterator(2, null), + 0, 9, 10); + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, -10, 0)), + 1, 0, 10); + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 9)), + 0, 9, 10); + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 0, 1000)), + 0, 9, 10); + + gens.addAll(Collections.singleton(MockSchema.sstable(100, 5, true, 5, 10, 3, cfs))); + assertIter(gens.wrappingIterator(3, sst(++gen, cfs, -10, 0)), + 5, 5, 1); + assertIter(gens.wrappingIterator(3, sst(++gen, cfs, 0, 100)), + 5, 5, 1); + + gens.addAll(Collections.singleton(MockSchema.sstable(200, 5, true, 5, 10, 4, cfs))); + gens.addAll(Collections.singleton(MockSchema.sstable(201, 5, true, 40, 50, 4, cfs))); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 0)), + 5, 40, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 5)), + 40, 5, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 7, 8)), + 40, 5, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 39, 39)), + 40, 5, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 40, 40)), + 5, 40, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 100, 1000)), + 5, 40, 2); + } + + @Test + public void testWrappingIterableWiderSSTables() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledGenerations generations = new LeveledGenerations(); + int gen = 0; + generations.addAll(Lists.newArrayList( + sst(++gen, cfs, 0, 50), + sst(++gen, cfs, 51, 100), + sst(++gen, cfs, 150, 200))); + + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, -100, -50)), + 0, 150, 3); + + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 40)), + 51, 0, 3); + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 50)), + 51, 0, 3); + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 51)), + 150, 51, 3); + + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 149)), + 150, 51, 3); + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 300)), + 0, 150, 3); + + } + + @Test + public void testEmptyLevel() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledGenerations generations = new LeveledGenerations(); + assertFalse(generations.wrappingIterator(3, sst(0, cfs, 0, 10)).hasNext()); + assertFalse(generations.wrappingIterator(3, null).hasNext()); + } + + @Test + public void testFillLevels() + { + LeveledGenerations generations = new LeveledGenerations(); + + ColumnFamilyStore cfs = MockSchema.newCFS(); + for (int i = 0; i < LeveledGenerations.MAX_LEVEL_COUNT; i++) + generations.addAll(Collections.singleton(MockSchema.sstableWithLevel(i, i, i, i, cfs))); + + for (int i = 0; i < generations.levelCount(); i++) + assertEquals(i, generations.get(i).iterator().next().getSSTableLevel()); + + assertEquals(9, generations.levelCount()); + + try + { + generations.get(9); + fail("don't have 9 generations"); + } + catch (ArrayIndexOutOfBoundsException e) + {} + try + { + generations.get(-1); + fail("don't have -1 generations"); + } + catch (ArrayIndexOutOfBoundsException e) + {} + } + + private void assertIter(Iterator<SSTableReader> iter, long first, long last, int expectedCount) + { + List<SSTableReader> drained = Lists.newArrayList(iter); + assertEquals(expectedCount, drained.size()); + assertEquals(dk(first).getToken(), first(drained).first.getToken()); + assertEquals(dk(last).getToken(), last(drained).first.getToken()); // we sort by first token, so this is the first token of the last sstable in iter + } + + private SSTableReader last(Iterable<SSTableReader> iter) + { + return Iterables.getLast(iter); + } + private SSTableReader first(Iterable<SSTableReader> iter) + { + SSTableReader first = Iterables.getFirst(iter, null); + if (first == null) + throw new RuntimeException(); + return first; + } + + private DecoratedKey dk(long x) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(x), ByteBufferUtil.bytes(x)); + } + private SSTableReader sst(int gen, ColumnFamilyStore cfs, long first, long last) + { + return MockSchema.sstable(gen, 5, true, first, last, 2, cfs); + } + + private void print(SSTableReader sstable) + { + System.out.println(String.format("%d %s %s %d", sstable.descriptor.generation, sstable.first, sstable.last, sstable.getSSTableLevel())); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org