This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new f41ea9f  Make sure LCS handles duplicate sstable added/removed 
notifications correctly.
f41ea9f is described below

commit f41ea9fb14936bca4aeea0ab2bf6d55c51f37f6a
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Sep 9 12:49:42 2020 +0200

    Make sure LCS handles duplicate sstable added/removed notifications 
correctly.
    
    Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-14103
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 .../db/compaction/AbstractCompactionStrategy.java  |  18 +-
 .../db/compaction/CompactionStrategyManager.java   | 240 ++++++---------
 .../db/compaction/LeveledCompactionStrategy.java   |  28 +-
 .../db/compaction/LeveledGenerations.java          | 311 +++++++++++++++++++
 .../cassandra/db/compaction/LeveledManifest.java   | 340 +++++----------------
 .../apache/cassandra/tools/StandaloneScrubber.java |  31 +-
 .../LongLeveledCompactionStrategyCQLTest.java      |  92 ++++++
 .../LongLeveledCompactionStrategyTest.java         |   2 +-
 test/unit/org/apache/cassandra/MockSchema.java     |  27 +-
 .../compaction/LeveledCompactionStrategyTest.java  | 238 ++++++++++++++-
 .../db/compaction/LeveledGenerationsTest.java      | 199 ++++++++++++
 13 files changed, 1076 insertions(+), 459 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0a56c4f..117fa96 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.9
+ * Make sure LCS handles duplicate sstable added/removed notifications 
correctly (CASSANDRA-14103)
 
 3.11.8
  * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in 
megabytes not bytes (CASSANDRA-16071)
diff --git a/NEWS.txt b/NEWS.txt
index 077bd8b..b3246b5 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -42,6 +42,14 @@ restore snapshots created with the previous major version 
using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+3.11.9
+======
+Upgrading
+---------
+   - Custom compaction strategies must handle getting sstables added/removed 
notifications for
+     sstables already added/removed - see CASSANDRA-14103 for details. This 
has been a requirement
+     for correct operation since 3.11.0 due to an issue in 
CompactionStrategyManager.
+
 3.11.7
 ======
 
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 3a5e9aa..4298be8 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -309,22 +309,36 @@ public abstract class AbstractCompactionStrategy
         return getClass().getSimpleName();
     }
 
+    /**
+     * Replaces sstables in the compaction strategy
+     *
+     * Note that implementations must be able to handle duplicate 
notifications here (that removed are already gone and
+     * added have already been added)
+     * */
     public synchronized void replaceSSTables(Collection<SSTableReader> 
removed, Collection<SSTableReader> added)
     {
         for (SSTableReader remove : removed)
             removeSSTable(remove);
-        for (SSTableReader add : added)
-            addSSTable(add);
+        addSSTables(added);
     }
 
+    /**
+     * Adds sstable, note that implementations must handle duplicate 
notifications here (added already being in the compaction strategy)
+     */
     public abstract void addSSTable(SSTableReader added);
 
+    /**
+     * Adds sstables, note that implementations must handle duplicate 
notifications here (added already being in the compaction strategy)
+     */
     public synchronized void addSSTables(Iterable<SSTableReader> added)
     {
         for (SSTableReader sstable : added)
             addSSTable(sstable);
     }
 
+    /**
+     * Removes sstable from the strategy, implementations must be able to 
handle the sstable having already been removed.
+     */
     public abstract void removeSSTable(SSTableReader sstable);
 
     public static class ScannerList implements AutoCloseable
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index d486679..c80504c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -104,8 +104,10 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         we will use the new compaction parameters.
      **/
     private volatile CompactionParams schemaCompactionParams;
-    private boolean supportsEarlyOpen;
-    private int fanout;
+    private volatile boolean supportsEarlyOpen;
+    private volatile int fanout;
+    private volatile long maxSSTableSizeBytes;
+    private volatile String name;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
@@ -217,6 +219,8 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             unrepaired.forEach(AbstractCompactionStrategy::startup);
             supportsEarlyOpen = repaired.get(0).supportsEarlyOpen();
             fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? 
((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : 
LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
+            name = repaired.get(0).getName();
+            maxSSTableSizeBytes = repaired.get(0).getMaxSSTableBytes();
         }
         finally
         {
@@ -271,8 +275,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      * @param sstable
      * @return
      */
-    @VisibleForTesting
-    protected int compactionStrategyIndexFor(SSTableReader sstable)
+    int compactionStrategyIndexFor(SSTableReader sstable)
     {
         // should not call maybeReload because it may be called from within 
lock
         readLock.lock();
@@ -340,18 +343,17 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      */
     //TODO improve this to reload after receiving a notification rather than 
trying to reload on every operation
     @VisibleForTesting
-    protected boolean maybeReloadDiskBoundaries()
+    protected void maybeReloadDiskBoundaries()
     {
         if (!currentBoundaries.isOutOfDate())
-            return false;
+            return;
 
         writeLock.lock();
         try
         {
             if (!currentBoundaries.isOutOfDate())
-                return false;
+                return;
             reload(params);
-            return true;
         }
         finally
         {
@@ -434,7 +436,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         {
             if (repaired.get(0) instanceof LeveledCompactionStrategy && 
unrepaired.get(0) instanceof LeveledCompactionStrategy)
             {
-                int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+                int[] res = new int[LeveledGenerations.MAX_LEVEL_COUNT];
                 for (AbstractCompactionStrategy strategy : repaired)
                 {
                     int[] repairedCountPerLevel = ((LeveledCompactionStrategy) 
strategy).getAllLevelSize();
@@ -485,153 +487,120 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         }
     }
 
+    /**
+     * Should only be called holding the readLock
+     */
     private void handleFlushNotification(Iterable<SSTableReader> added)
     {
-        // If reloaded, SSTables will be placed in their correct locations
-        // so there is no need to process notification
-        if (maybeReloadDiskBoundaries())
-            return;
-
-        readLock.lock();
-        try
-        {
-            for (SSTableReader sstable : added)
-                compactionStrategyFor(sstable).addSSTable(sstable);
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        for (SSTableReader sstable : added)
+            compactionStrategyFor(sstable).addSSTable(sstable);
     }
 
+    /**
+     * Should only be called holding the readLock
+     */
     private void handleListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
     {
-        // If reloaded, SSTables will be placed in their correct locations
-        // so there is no need to process notification
-        if (maybeReloadDiskBoundaries())
-            return;
-
-        readLock.lock();
-        try
-        {
-            // a bit of gymnastics to be able to replace sstables in 
compaction strategies
-            // we use this to know that a compaction finished and where to 
start the next compaction in LCS
-            int locationSize = partitionSSTablesByTokenRange? 
currentBoundaries.directories.size() : 1;
+        // a bit of gymnastics to be able to replace sstables in compaction 
strategies
+        // we use this to know that a compaction finished and where to start 
the next compaction in LCS
+        int locationSize = partitionSSTablesByTokenRange? 
currentBoundaries.directories.size() : 1;
 
-            List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
-            List<Set<SSTableReader>> repairedAdded = new 
ArrayList<>(locationSize);
-            List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
-            List<Set<SSTableReader>> unrepairedAdded = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> unrepairedAdded = new 
ArrayList<>(locationSize);
 
-            for (int i = 0; i < locationSize; i++)
-            {
-                repairedRemoved.add(new HashSet<>());
-                repairedAdded.add(new HashSet<>());
-                unrepairedRemoved.add(new HashSet<>());
-                unrepairedAdded.add(new HashSet<>());
-            }
-
-            for (SSTableReader sstable : removed)
-            {
-                int i = compactionStrategyIndexFor(sstable);
-                if (sstable.isRepaired())
-                    repairedRemoved.get(i).add(sstable);
-                else
-                    unrepairedRemoved.get(i).add(sstable);
-            }
-            for (SSTableReader sstable : added)
-            {
-                int i = compactionStrategyIndexFor(sstable);
-                if (sstable.isRepaired())
-                    repairedAdded.get(i).add(sstable);
-                else
-                    unrepairedAdded.get(i).add(sstable);
-            }
-            for (int i = 0; i < locationSize; i++)
-            {
-                if (!repairedRemoved.get(i).isEmpty())
-                    repaired.get(i).replaceSSTables(repairedRemoved.get(i), 
repairedAdded.get(i));
-                else
-                    repaired.get(i).addSSTables(repairedAdded.get(i));
+        for (int i = 0; i < locationSize; i++)
+        {
+            repairedRemoved.add(new HashSet<>());
+            repairedAdded.add(new HashSet<>());
+            unrepairedRemoved.add(new HashSet<>());
+            unrepairedAdded.add(new HashSet<>());
+        }
 
-                if (!unrepairedRemoved.get(i).isEmpty())
-                    
unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), 
unrepairedAdded.get(i));
-                else
-                    unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
-            }
+        for (SSTableReader sstable : removed)
+        {
+            int i = compactionStrategyIndexFor(sstable);
+            if (sstable.isRepaired())
+                repairedRemoved.get(i).add(sstable);
+            else
+                unrepairedRemoved.get(i).add(sstable);
         }
-        finally
+        for (SSTableReader sstable : added)
         {
-            readLock.unlock();
+            int i = compactionStrategyIndexFor(sstable);
+            if (sstable.isRepaired())
+                repairedAdded.get(i).add(sstable);
+            else
+                unrepairedAdded.get(i).add(sstable);
+        }
+        for (int i = 0; i < locationSize; i++)
+        {
+            if (!repairedRemoved.get(i).isEmpty())
+                repaired.get(i).replaceSSTables(repairedRemoved.get(i), 
repairedAdded.get(i));
+            else
+                repaired.get(i).addSSTables(repairedAdded.get(i));
+
+            if (!unrepairedRemoved.get(i).isEmpty())
+                unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), 
unrepairedAdded.get(i));
+            else
+                unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
         }
     }
 
     private void handleRepairStatusChangedNotification(Iterable<SSTableReader> 
sstables)
     {
-        // If reloaded, SSTables will be placed in their correct locations
-        // so there is no need to process notification
-        if (maybeReloadDiskBoundaries())
-            return;
-        // we need a write lock here since we move sstables from one strategy 
instance to another
-        readLock.lock();
-        try
+        for (SSTableReader sstable : sstables)
         {
-            for (SSTableReader sstable : sstables)
+            int index = compactionStrategyIndexFor(sstable);
+            if (sstable.isRepaired())
             {
-                int index = compactionStrategyIndexFor(sstable);
-                if (sstable.isRepaired())
-                {
-                    unrepaired.get(index).removeSSTable(sstable);
-                    repaired.get(index).addSSTable(sstable);
-                }
-                else
-                {
-                    repaired.get(index).removeSSTable(sstable);
-                    unrepaired.get(index).addSSTable(sstable);
-                }
+                unrepaired.get(index).removeSSTable(sstable);
+                repaired.get(index).addSSTable(sstable);
+            }
+            else
+            {
+                repaired.get(index).removeSSTable(sstable);
+                unrepaired.get(index).addSSTable(sstable);
             }
         }
-        finally
-        {
-            readLock.unlock();
-        }
+
     }
 
     private void handleDeletingNotification(SSTableReader deleted)
     {
-        // If reloaded, SSTables will be placed in their correct locations
-        // so there is no need to process notification
-        if (maybeReloadDiskBoundaries())
-            return;
-        readLock.lock();
-        try
-        {
-            compactionStrategyFor(deleted).removeSSTable(deleted);
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        compactionStrategyFor(deleted).removeSSTable(deleted);
     }
 
     public void handleNotification(INotification notification, Object sender)
     {
-        if (notification instanceof SSTableAddedNotification)
-        {
-            handleFlushNotification(((SSTableAddedNotification) 
notification).added);
-        }
-        else if (notification instanceof SSTableListChangedNotification)
-        {
-            SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
-            handleListChangedNotification(listChangedNotification.added, 
listChangedNotification.removed);
-        }
-        else if (notification instanceof SSTableRepairStatusChanged)
+        // we might race with reload adding/removing the sstables, this means 
that compaction strategies
+        // must handle double notifications.
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
         {
-            
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) 
notification).sstables);
+            if (notification instanceof SSTableAddedNotification)
+            {
+                handleFlushNotification(((SSTableAddedNotification) 
notification).added);
+            }
+            else if (notification instanceof SSTableListChangedNotification)
+            {
+                SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
+                handleListChangedNotification(listChangedNotification.added, 
listChangedNotification.removed);
+            }
+            else if (notification instanceof SSTableRepairStatusChanged)
+            {
+                
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) 
notification).sstables);
+            }
+            else if (notification instanceof SSTableDeletingNotification)
+            {
+                handleDeletingNotification(((SSTableDeletingNotification) 
notification).deleting);
+            }
         }
-        else if (notification instanceof SSTableDeletingNotification)
+        finally
         {
-            handleDeletingNotification(((SSTableDeletingNotification) 
notification).deleting);
+            readLock.unlock();
         }
     }
 
@@ -750,15 +719,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
     public long getMaxSSTableBytes()
     {
-        readLock.lock();
-        try
-        {
-            return unrepaired.get(0).getMaxSSTableBytes();
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        return maxSSTableSizeBytes;
     }
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, 
int gcBefore, long maxSSTableBytes)
@@ -925,16 +886,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
     public String getName()
     {
-        maybeReloadDiskBoundaries();
-        readLock.lock();
-        try
-        {
-            return unrepaired.get(0).getName();
-        }
-        finally
-        {
-            readLock.unlock();
-        }
+        return name;
     }
 
     public List<List<AbstractCompactionStrategy>> getStrategies()
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 8c37bb4..77cb223 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -161,7 +161,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
     @SuppressWarnings("resource") // transaction is closed by 
AbstractCompactionTask::execute
     public synchronized Collection<AbstractCompactionTask> getMaximalTask(int 
gcBefore, boolean splitOutput)
     {
-        Iterable<SSTableReader> sstables = manifest.getAllSSTables();
+        Iterable<SSTableReader> sstables = manifest.getSSTables();
 
         Iterable<SSTableReader> filteredSSTables = 
filterSuspectSSTables(sstables);
         if (Iterables.isEmpty(sstables))
@@ -340,9 +340,15 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
     }
 
     @Override
+    public void addSSTables(Iterable<SSTableReader> sstables)
+    {
+        manifest.addSSTables(sstables);
+    }
+
+    @Override
     public void addSSTable(SSTableReader added)
     {
-        manifest.add(added);
+        manifest.addSSTables(Collections.singleton(added));
     }
 
     @Override
@@ -493,21 +499,17 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
         level:
         for (int i = manifest.getLevelCount(); i >= 0; i--)
         {
+            if (manifest.getLevelSize(i) == 0)
+                continue;
             // sort sstables by droppable ratio in descending order
-            SortedSet<SSTableReader> sstables = manifest.getLevelSorted(i, new 
Comparator<SSTableReader>()
-            {
-                public int compare(SSTableReader o1, SSTableReader o2)
-                {
-                    double r1 = 
o1.getEstimatedDroppableTombstoneRatio(gcBefore);
-                    double r2 = 
o2.getEstimatedDroppableTombstoneRatio(gcBefore);
-                    return -1 * Doubles.compare(r1, r2);
-                }
+            List<SSTableReader> tombstoneSortedSSTables = 
manifest.getLevelSorted(i, (o1, o2) -> {
+                double r1 = o1.getEstimatedDroppableTombstoneRatio(gcBefore);
+                double r2 = o2.getEstimatedDroppableTombstoneRatio(gcBefore);
+                return -1 * Doubles.compare(r1, r2);
             });
-            if (sstables.isEmpty())
-                continue;
 
             Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
-            for (SSTableReader sstable : sstables)
+            for (SSTableReader sstable : tombstoneSortedSSTables)
             {
                 if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= 
tombstoneThreshold)
                     continue level;
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
new file mode 100644
index 0000000..f7087f0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.primitives.Ints;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Handles the leveled manifest generations
+ *
+ * Not thread safe, all access should be synchronized in LeveledManifest
+ */
+class LeveledGenerations
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LeveledGenerations.class);
+    private final boolean strictLCSChecksTest = 
Boolean.getBoolean(Config.PROPERTY_PREFIX + "test.strict_lcs_checks");
+    // allocate enough generations for a PB of data, with a 1-MB sstable size. 
 (Note that if maxSSTableSize is
+    // updated, we will still have sstables of the older, potentially smaller 
size.  So don't make this
+    // dependent on maxSSTableSize.)
+    static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
+
+    private final Set<SSTableReader> l0 = new HashSet<>();
+    private static long lastOverlapCheck = System.nanoTime();
+    // note that since l0 is broken out, levels[0] represents L1:
+    private final TreeSet<SSTableReader> [] levels = new 
TreeSet[MAX_LEVEL_COUNT - 1];
+
+    private static final Comparator<SSTableReader> nonL0Comparator = (o1, o2) 
-> {
+        int cmp = SSTableReader.sstableComparator.compare(o1, o2);
+        if (cmp == 0)
+            cmp = Ints.compare(o1.descriptor.generation, 
o2.descriptor.generation);
+        return cmp;
+    };
+
+    LeveledGenerations()
+    {
+        for (int i = 0; i < MAX_LEVEL_COUNT - 1; i++)
+            levels[i] = new TreeSet<>(nonL0Comparator);
+    }
+
+    Set<SSTableReader> get(int level)
+    {
+        if (level > levelCount() - 1 || level < 0)
+            throw new ArrayIndexOutOfBoundsException("Invalid generation " + 
level + " - maximum is " + (levelCount() - 1));
+        if (level == 0)
+            return l0;
+        return levels[level - 1];
+    }
+
+    int levelCount()
+    {
+        return levels.length + 1;
+    }
+
+    /**
+     * Adds readers to the correct level
+     *
+     * If adding an sstable would cause an overlap in the level (if level > 1) 
we send it to L0. This can happen
+     * for example when moving sstables from unrepaired to repaired.
+     *
+     * If the sstable is already in the manifest we skip it.
+     *
+     * If the sstable exists in the manifest but has the wrong level, it is 
removed from the wrong level and added to the correct one
+     *
+     * todo: group sstables per level, add all if level is currently empty, 
improve startup speed
+     */
+    void addAll(Iterable<SSTableReader> readers)
+    {
+        logDistribution();
+        for (SSTableReader sstable : readers)
+        {
+            assert sstable.getSSTableLevel() < levelCount() : "Invalid level " 
+ sstable.getSSTableLevel() + " out of " + (levelCount() - 1);
+            int existingLevel = getLevelIfExists(sstable);
+            if (existingLevel != -1)
+            {
+                if (sstable.getSSTableLevel() != existingLevel)
+                {
+                    logger.error("SSTable {} on the wrong level in the 
manifest - {} instead of {} as recorded in the sstable metadata, removing from 
level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel);
+                    if (strictLCSChecksTest)
+                        throw new AssertionError("SSTable not in matching 
level in manifest: "+sstable + ": "+existingLevel+" != " + 
sstable.getSSTableLevel());
+                    get(existingLevel).remove(sstable);
+                }
+                else
+                {
+                    logger.info("Manifest already contains {} in level {} - 
skipping", sstable, existingLevel);
+                    continue;
+                }
+            }
+
+            if (sstable.getSSTableLevel() == 0)
+            {
+                l0.add(sstable);
+                continue;
+            }
+
+            TreeSet<SSTableReader> level = levels[sstable.getSSTableLevel() - 
1];
+            /*
+            current level: |-----||----||----|        |---||---|
+              new sstable:                      |--|
+                                          ^ before
+                                                        ^ after
+                overlap if before.last >= newsstable.first or after.first <= 
newsstable.last
+             */
+            SSTableReader after = level.ceiling(sstable);
+            SSTableReader before = level.floor(sstable);
+
+            if (before != null && before.last.compareTo(sstable.first) >= 0 ||
+                after != null && after.first.compareTo(sstable.last) <= 0)
+            {
+                if (strictLCSChecksTest) // we can only assert this in tests 
since this is normal when for example moving sstables from unrepaired to 
repaired
+                    throw new AssertionError("Got unexpected overlap in level 
"+sstable.getSSTableLevel());
+                sendToL0(sstable);
+            }
+            else
+            {
+                level.add(sstable);
+            }
+        }
+        maybeVerifyLevels();
+    }
+
+    /**
+     * Sends sstable to L0 by mutating its level in the sstable metadata.
+     *
+     * SSTable should not exist in the manifest
+     */
+    private void sendToL0(SSTableReader sstable)
+    {
+        try
+        {
+            
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
+            sstable.reloadSSTableMetadata();
+        }
+        catch (IOException e)
+        {
+            // Adding it to L0 and marking suspect is probably the best we can 
do here - it won't create overlap
+            // and we won't pick it for later compactions.
+            logger.error("Failed mutating sstable metadata for {} - adding it 
to L0 to avoid overlap. Marking suspect", sstable, e);
+            sstable.markSuspect();
+        }
+        l0.add(sstable);
+    }
+
+    /**
+     * Tries to find the sstable in the levels without using the 
sstable-recorded level
+     *
+     * Used to make sure we don't try to re-add an existing sstable
+     */
+    private int getLevelIfExists(SSTableReader sstable)
+    {
+        for (int i = 0; i < levelCount(); i++)
+        {
+            if (get(i).contains(sstable))
+                return i;
+        }
+        return -1;
+    }
+
+    int remove(Collection<SSTableReader> readers)
+    {
+        int minLevel = Integer.MAX_VALUE;
+        for (SSTableReader sstable : readers)
+        {
+            int level = sstable.getSSTableLevel();
+            minLevel = Math.min(minLevel, level);
+            get(level).remove(sstable);
+        }
+        return minLevel;
+    }
+
+    int[] getAllLevelSize()
+    {
+        int[] counts = new int[levelCount()];
+        for (int i = 0; i < levelCount(); i++)
+            counts[i] = get(i).size();
+        return counts;
+    }
+
+    Set<SSTableReader> allSSTables()
+    {
+        ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder();
+        builder.addAll(l0);
+        for (Set<SSTableReader> sstables : levels)
+            builder.addAll(sstables);
+        return builder.build();
+    }
+
+    /**
+     * given a level with sstables with first tokens [0, 10, 20, 30] and a 
lastCompactedSSTable with last = 15, we will
+     * return an Iterator over [20, 30, 0, 10].
+     */
+    Iterator<SSTableReader> wrappingIterator(int lvl, SSTableReader 
lastCompactedSSTable)
+    {
+        assert lvl > 0; // only makes sense in L1+
+        TreeSet<SSTableReader> level = levels[lvl - 1];
+        if (level.isEmpty())
+            return Collections.emptyIterator();
+        if (lastCompactedSSTable == null)
+            return level.iterator();
+
+        PeekingIterator<SSTableReader> tail = 
Iterators.peekingIterator(level.tailSet(lastCompactedSSTable).iterator());
+        SSTableReader pivot = null;
+        // then we need to make sure that the first token of the pivot is 
greater than the last token of the lastCompactedSSTable
+        while (tail.hasNext())
+        {
+            SSTableReader potentialPivot = tail.peek();
+            if (potentialPivot.first.compareTo(lastCompactedSSTable.last) > 0)
+            {
+                pivot = potentialPivot;
+                break;
+            }
+            tail.next();
+        }
+
+        if (pivot == null)
+            return level.iterator();
+
+        return Iterators.concat(tail, level.headSet(pivot, false).iterator());
+    }
+
+    void logDistribution()
+    {
+        if (logger.isTraceEnabled())
+        {
+            for (int i = 0; i < levelCount(); i++)
+            {
+                Set<SSTableReader> level = get(i);
+                if (!level.isEmpty())
+                {
+                    logger.trace("L{} contains {} SSTables ({}) in {}",
+                                 i,
+                                 level.size(),
+                                 
FBUtilities.prettyPrintMemory(SSTableReader.getTotalBytes(level)),
+                                 this);
+                }
+            }
+        }
+    }
+
+    Set<SSTableReader>[] snapshot()
+    {
+        Set<SSTableReader> [] levelsCopy = new Set[levelCount()];
+        for (int i = 0; i < levelCount(); i++)
+            levelsCopy[i] = ImmutableSet.copyOf(get(i));
+        return levelsCopy;
+    }
+
+    /**
+     * do extra verification of the sstables in the generations
+     *
+     * only used during tests
+     */
+    private void maybeVerifyLevels()
+    {
+        if (!strictLCSChecksTest || System.nanoTime() - lastOverlapCheck <= 
TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS))
+            return;
+        logger.info("LCS verifying levels");
+        lastOverlapCheck = System.nanoTime();
+        for (int i = 1; i < levelCount(); i++)
+        {
+            SSTableReader prev = null;
+            for (SSTableReader sstable : get(i))
+            {
+                // no overlap:
+                assert prev == null || prev.last.compareTo(sstable.first) < 0;
+                prev = sstable;
+                // make sure it does not exist in any other level:
+                for (int j = 0; j < levelCount(); j++)
+                {
+                    if (i == j)
+                        continue;
+                    assert !get(j).contains(sstable);
+                }
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 8a8362f..d630730 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -17,19 +17,19 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.IOException;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,9 +40,10 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import static 
org.apache.cassandra.db.compaction.LeveledGenerations.MAX_LEVEL_COUNT;
+
 public class LeveledManifest
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LeveledManifest.class);
@@ -59,14 +60,12 @@ public class LeveledManifest
      * that level into lower level compactions
      */
     private static final int NO_COMPACTION_LIMIT = 25;
-    // allocate enough generations for a PB of data, with a 1-MB sstable size. 
 (Note that if maxSSTableSize is
-    // updated, we will still have sstables of the older, potentially smaller 
size.  So don't make this
-    // dependent on maxSSTableSize.)
-    public static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 
1000);
+
     private final ColumnFamilyStore cfs;
-    @VisibleForTesting
-    protected final List<SSTableReader>[] generations;
-    private final PartitionPosition[] lastCompactedKeys;
+
+    private final LeveledGenerations generations;
+
+    private final SSTableReader[] lastCompactedSSTables;
     private final long maxSSTableSizeInBytes;
     private final SizeTieredCompactionStrategyOptions options;
     private final int [] compactionCounter;
@@ -79,13 +78,8 @@ public class LeveledManifest
         this.options = options;
         this.levelFanoutSize = fanoutSize;
 
-        generations = new List[MAX_LEVEL_COUNT];
-        lastCompactedKeys = new PartitionPosition[MAX_LEVEL_COUNT];
-        for (int i = 0; i < generations.length; i++)
-        {
-            generations[i] = new ArrayList<>();
-            lastCompactedKeys[i] = 
cfs.getPartitioner().getMinimumToken().minKeyBound();
-        }
+        lastCompactedSSTables = new SSTableReader[MAX_LEVEL_COUNT];
+        generations = new LeveledGenerations();
         compactionCounter = new int[MAX_LEVEL_COUNT];
     }
 
@@ -99,14 +93,7 @@ public class LeveledManifest
         LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize, 
fanoutSize, options);
 
         // ensure all SSTables are in the manifest
-        for (SSTableReader ssTableReader : sstables)
-        {
-            manifest.add(ssTableReader);
-        }
-        for (int i = 1; i < manifest.getAllLevelSize().length; i++)
-        {
-            manifest.repairOverlappingSSTables(i);
-        }
+        manifest.addSSTables(sstables);
         manifest.calculateLastCompactedKeys();
         return manifest;
     }
@@ -115,17 +102,18 @@ public class LeveledManifest
      * If we want to start compaction in level n, find the newest (by 
modification time) file in level n+1
      * and use its last token for last compacted key in level n;
      */
-    public void calculateLastCompactedKeys()
+    void calculateLastCompactedKeys()
     {
-        for (int i = 0; i < generations.length - 1; i++)
+        for (int i = 0; i < generations.levelCount() - 1; i++)
         {
+            Set<SSTableReader> level = generations.get(i + 1);
             // this level is empty
-            if (generations[i + 1].isEmpty())
+            if (level.isEmpty())
                 continue;
 
             SSTableReader sstableWithMaxModificationTime = null;
             long maxModificationTime = Long.MIN_VALUE;
-            for (SSTableReader ssTableReader : generations[i + 1])
+            for (SSTableReader ssTableReader : level)
             {
                 long modificationTime = 
ssTableReader.getCreationTimeFor(Component.DATA);
                 if (modificationTime >= maxModificationTime)
@@ -135,80 +123,27 @@ public class LeveledManifest
                 }
             }
 
-            lastCompactedKeys[i] = sstableWithMaxModificationTime.last;
+            lastCompactedSSTables[i] = sstableWithMaxModificationTime;
         }
     }
 
-    public synchronized void add(SSTableReader reader)
+    public synchronized void addSSTables(Iterable<SSTableReader> readers)
     {
-        int level = reader.getSSTableLevel();
-
-        assert level < generations.length : "Invalid level " + level + " out 
of " + (generations.length - 1);
-        logDistribution();
-        if (canAddSSTable(reader))
-        {
-            // adding the sstable does not cause overlap in the level
-            logger.trace("Adding {} to L{}", reader, level);
-            generations[level].add(reader);
-        }
-        else
-        {
-            // this can happen if:
-            // * a compaction has promoted an overlapping sstable to the given 
level, or
-            //   was also supposed to add an sstable at the given level.
-            // * we are moving sstables from unrepaired to repaired and the 
sstable
-            //   would cause overlap
-            //
-            // The add(..):ed sstable will be sent to level 0
-            try
-            {
-                
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
-                reader.reloadSSTableMetadata();
-            }
-            catch (IOException e)
-            {
-                logger.error("Could not change sstable level - adding it at 
level 0 anyway, we will find it at restart.", e);
-            }
-            if (!contains(reader))
-            {
-                generations[0].add(reader);
-            }
-            else
-            {
-                // An SSTable being added multiple times to this manifest 
indicates a programming error, but we don't
-                // throw an AssertionError because this shouldn't break the 
compaction strategy. Instead we log it
-                // together with a RuntimeException so the stack is print for 
troubleshooting if this ever happens.
-                logger.warn("SSTable {} is already present on leveled manifest 
and should not be re-added.", reader, new RuntimeException());
-            }
-        }
-    }
-
-    private boolean contains(SSTableReader reader)
-    {
-        for (int i = 0; i < generations.length; i++)
-        {
-            if (generations[i].contains(reader))
-                return true;
-        }
-        return false;
+        generations.addAll(readers);
     }
 
     public synchronized void replace(Collection<SSTableReader> removed, 
Collection<SSTableReader> added)
     {
         assert !removed.isEmpty(); // use add() instead of promote when adding 
new sstables
-        logDistribution();
         if (logger.isTraceEnabled())
+        {
+            generations.logDistribution();
             logger.trace("Replacing [{}]", toString(removed));
+        }
 
         // the level for the added sstables is the max of the removed ones,
         // plus one if the removed were all on the same level
-        int minLevel = Integer.MAX_VALUE;
-
-        for (SSTableReader sstable : removed)
-        {
-            int thisLevel = remove(sstable);
-            minLevel = Math.min(minLevel, thisLevel);
-        }
+        int minLevel = generations.remove(removed);
 
         // it's valid to do a remove w/o an add (e.g. on truncate)
         if (added.isEmpty())
@@ -217,76 +152,8 @@ public class LeveledManifest
         if (logger.isTraceEnabled())
             logger.trace("Adding [{}]", toString(added));
 
-        for (SSTableReader ssTableReader : added)
-            add(ssTableReader);
-        lastCompactedKeys[minLevel] = 
SSTableReader.sstableOrdering.max(added).last;
-    }
-
-    public synchronized void repairOverlappingSSTables(int level)
-    {
-        SSTableReader previous = null;
-        Collections.sort(generations[level], SSTableReader.sstableComparator);
-        List<SSTableReader> outOfOrderSSTables = new ArrayList<>();
-        for (SSTableReader current : generations[level])
-        {
-            if (previous != null && current.first.compareTo(previous.last) <= 
0)
-            {
-                logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}].  
This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact 
that you have dropped sstables from another node into the data directory. " +
-                            "Sending back to L0.  If you didn't drop in 
sstables, and have not yet run scrub, you should do so since you may also have 
rows out-of-order within an sstable",
-                            level, previous, previous.first, previous.last, 
current, current.first, current.last);
-                outOfOrderSSTables.add(current);
-            }
-            else
-            {
-                previous = current;
-            }
-        }
-
-        if (!outOfOrderSSTables.isEmpty())
-        {
-            for (SSTableReader sstable : outOfOrderSSTables)
-                sendBackToL0(sstable);
-        }
-    }
-
-    /**
-     * Checks if adding the sstable creates an overlap in the level
-     * @param sstable the sstable to add
-     * @return true if it is safe to add the sstable in the level.
-     */
-    private boolean canAddSSTable(SSTableReader sstable)
-    {
-        int level = sstable.getSSTableLevel();
-        if (level == 0)
-            return true;
-
-        List<SSTableReader> copyLevel = new ArrayList<>(generations[level]);
-        copyLevel.add(sstable);
-        Collections.sort(copyLevel, SSTableReader.sstableComparator);
-
-        SSTableReader previous = null;
-        for (SSTableReader current : copyLevel)
-        {
-            if (previous != null && current.first.compareTo(previous.last) <= 
0)
-                return false;
-            previous = current;
-        }
-        return true;
-    }
-
-    private synchronized void sendBackToL0(SSTableReader sstable)
-    {
-        remove(sstable);
-        try
-        {
-            
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
-            sstable.reloadSSTableMetadata();
-            add(sstable);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("Could not reload sstable meta data", 
e);
-        }
+        generations.addAll(added);
+        lastCompactedSSTables[minLevel] = 
SSTableReader.sstableOrdering.max(added);
     }
 
     private String toString(Collection<SSTableReader> sstables)
@@ -329,7 +196,7 @@ public class LeveledManifest
         // the streamed files can be placed in their original levels
         if (StorageService.instance.isBootstrapMode())
         {
-            List<SSTableReader> mostInteresting = 
getSSTablesForSTCS(getLevel(0));
+            List<SSTableReader> mostInteresting = 
getSSTablesForSTCS(generations.get(0));
             if (!mostInteresting.isEmpty())
             {
                 logger.info("Bootstrapping - doing STCS in L0");
@@ -364,9 +231,9 @@ public class LeveledManifest
         // This isn't a magic wand -- if you are consistently writing too fast 
for LCS to keep
         // up, you're still screwed.  But if instead you have intermittent 
bursts of activity,
         // it can help a lot.
-        for (int i = generations.length - 1; i > 0; i--)
+        for (int i = generations.levelCount() - 1; i > 0; i--)
         {
-            List<SSTableReader> sstables = getLevel(i);
+            Set<SSTableReader> sstables = generations.get(i);
             if (sstables.isEmpty())
                 continue; // mostly this just avoids polluting the debug log 
with zero scores
             // we want to calculate score excluding compacting ones
@@ -400,7 +267,7 @@ public class LeveledManifest
         }
 
         // Higher levels are happy, time for a standard, non-STCS L0 compaction
-        if (getLevel(0).isEmpty())
+        if (generations.get(0).isEmpty())
             return null;
         Collection<SSTableReader> candidates = getCandidatesFor(0);
         if (candidates.isEmpty())
@@ -415,9 +282,9 @@ public class LeveledManifest
 
     private CompactionCandidate getSTCSInL0CompactionCandidate()
     {
-        if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > 
MAX_COMPACTING_L0)
+        if (!DatabaseDescriptor.getDisableSTCSInL0() && 
generations.get(0).size() > MAX_COMPACTING_L0)
         {
-            List<SSTableReader> mostInteresting = 
getSSTablesForSTCS(getLevel(0));
+            List<SSTableReader> mostInteresting = 
getSSTablesForSTCS(generations.get(0));
             if (!mostInteresting.isEmpty())
             {
                 logger.debug("L0 is too far behind, performing size-tiering 
there first");
@@ -454,7 +321,7 @@ public class LeveledManifest
     {
         Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
 
-        for (int i = generations.length - 1; i > 0; i--)
+        for (int i = generations.levelCount() - 1; i > 0; i--)
             compactionCounter[i]++;
         compactionCounter[targetLevel] = 0;
         if (logger.isTraceEnabled())
@@ -463,7 +330,7 @@ public class LeveledManifest
                 logger.trace("CompactionCounter: {}: {}", j, 
compactionCounter[j]);
         }
 
-        for (int i = generations.length - 1; i > 0; i--)
+        for (int i = generations.levelCount() - 1; i > 0; i--)
         {
             if (getLevelSize(i) > 0)
             {
@@ -486,9 +353,9 @@ public class LeveledManifest
                         return candidates;
                     Set<SSTableReader> compacting = 
cfs.getTracker().getCompacting();
                     Range<PartitionPosition> boundaries = new Range<>(min, 
max);
-                    for (SSTableReader sstable : getLevel(i))
+                    for (SSTableReader sstable : generations.get(i))
                     {
-                        Range<PartitionPosition> r = new 
Range<PartitionPosition>(sstable.first, sstable.last);
+                        Range<PartitionPosition> r = new 
Range<>(sstable.first, sstable.last);
                         if (boundaries.contains(r) && 
!compacting.contains(sstable))
                         {
                             logger.info("Adding high-level (L{}) {} to 
candidates", sstable.getSSTableLevel(), sstable);
@@ -506,35 +373,12 @@ public class LeveledManifest
 
     public synchronized int getLevelSize(int i)
     {
-        if (i >= generations.length)
-            throw new ArrayIndexOutOfBoundsException("Maximum valid generation 
is " + (generations.length - 1));
-        return getLevel(i).size();
+        return generations.get(i).size();
     }
 
     public synchronized int[] getAllLevelSize()
     {
-        int[] counts = new int[generations.length];
-        for (int i = 0; i < counts.length; i++)
-            counts[i] = getLevel(i).size();
-        return counts;
-    }
-
-    private void logDistribution()
-    {
-        if (logger.isTraceEnabled())
-        {
-            for (int i = 0; i < generations.length; i++)
-            {
-                if (!getLevel(i).isEmpty())
-                {
-                    logger.trace("L{} contains {} SSTables ({}) in {}",
-                                 i,
-                                 getLevel(i).size(),
-                                 
FBUtilities.prettyPrintMemory(SSTableReader.getTotalBytes(getLevel(i))),
-                                 this);
-                }
-            }
-        }
+        return generations.getAllLevelSize();
     }
 
     @VisibleForTesting
@@ -542,10 +386,15 @@ public class LeveledManifest
     {
         int level = reader.getSSTableLevel();
         assert level >= 0 : reader + " not present in manifest: "+level;
-        generations[level].remove(reader);
+        generations.remove(Collections.singleton(reader));
         return level;
     }
 
+    public synchronized Set<SSTableReader> getSSTables()
+    {
+        return generations.allSSTables();
+    }
+
     private static Set<SSTableReader> overlapping(Collection<SSTableReader> 
candidates, Iterable<SSTableReader> others)
     {
         assert !candidates.isEmpty();
@@ -591,7 +440,7 @@ public class LeveledManifest
     {
         assert start.compareTo(end) <= 0;
         Set<SSTableReader> overlapped = new HashSet<>();
-        Bounds<Token> promotedBounds = new Bounds<Token>(start, end);
+        Bounds<Token> promotedBounds = new Bounds<>(start, end);
 
         for (Map.Entry<SSTableReader, Bounds<Token>> pair : 
sstables.entrySet())
         {
@@ -601,20 +450,12 @@ public class LeveledManifest
         return overlapped;
     }
 
-    private static final Predicate<SSTableReader> suspectP = new 
Predicate<SSTableReader>()
-    {
-        public boolean apply(SSTableReader candidate)
-        {
-            return candidate.isMarkedSuspect();
-        }
-    };
-
     private static Map<SSTableReader, Bounds<Token>> 
genBounds(Iterable<SSTableReader> ssTableReaders)
     {
         Map<SSTableReader, Bounds<Token>> boundsMap = new HashMap<>();
         for (SSTableReader sstable : ssTableReaders)
         {
-            boundsMap.put(sstable, new Bounds<Token>(sstable.first.getToken(), 
sstable.last.getToken()));
+            boundsMap.put(sstable, new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()));
         }
         return boundsMap;
     }
@@ -626,14 +467,14 @@ public class LeveledManifest
      */
     private Collection<SSTableReader> getCandidatesFor(int level)
     {
-        assert !getLevel(level).isEmpty();
+        assert !generations.get(level).isEmpty();
         logger.trace("Choosing candidates for L{}", level);
 
         final Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
 
         if (level == 0)
         {
-            Set<SSTableReader> compactingL0 = getCompacting(0);
+            Set<SSTableReader> compactingL0 = getCompactingL0();
 
             PartitionPosition lastCompactingKey = null;
             PartitionPosition firstCompactingKey = null;
@@ -659,7 +500,7 @@ public class LeveledManifest
             // basically screwed, since we expect all or most L0 sstables to 
overlap with each L1 sstable.
             // So if an L1 sstable is suspect we can't do much besides try 
anyway and hope for the best.
             Set<SSTableReader> candidates = new HashSet<>();
-            Map<SSTableReader, Bounds<Token>> remaining = 
genBounds(Iterables.filter(getLevel(0), Predicates.not(suspectP)));
+            Map<SSTableReader, Bounds<Token>> remaining = 
genBounds(Iterables.filter(generations.get(0), 
Predicates.not(SSTableReader::isMarkedSuspect)));
 
             for (SSTableReader sstable : ageSortedSSTables(remaining.keySet()))
             {
@@ -672,7 +513,7 @@ public class LeveledManifest
 
                 for (SSTableReader newCandidate : overlappedL0)
                 {
-                    if (firstCompactingKey == null || lastCompactingKey == 
null || overlapping(firstCompactingKey.getToken(), 
lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0)
+                    if (firstCompactingKey == null || lastCompactingKey == 
null || overlapping(firstCompactingKey.getToken(), 
lastCompactingKey.getToken(), Collections.singleton(newCandidate)).size() == 0)
                         candidates.add(newCandidate);
                     remaining.remove(newCandidate);
                 }
@@ -691,7 +532,7 @@ public class LeveledManifest
                 // add sstables from L1 that overlap candidates
                 // if the overlapping ones are already busy in a compaction, 
leave it out.
                 // TODO try to find a set of L0 sstables that only overlaps 
with non-busy L1 sstables
-                Set<SSTableReader> l1overlapping = overlapping(candidates, 
getLevel(1));
+                Set<SSTableReader> l1overlapping = overlapping(candidates, 
generations.get(1));
                 if (Sets.intersection(l1overlapping, compacting).size() > 0)
                     return Collections.emptyList();
                 if (!overlapping(candidates, compactingL0).isEmpty())
@@ -704,27 +545,16 @@ public class LeveledManifest
                 return candidates;
         }
 
-        // for non-L0 compactions, pick up where we left off last time
-        Collections.sort(getLevel(level), SSTableReader.sstableComparator);
-        int start = 0; // handles case where the prior compaction touched the 
very last range
-        for (int i = 0; i < getLevel(level).size(); i++)
-        {
-            SSTableReader sstable = getLevel(level).get(i);
-            if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
-            {
-                start = i;
-                break;
-            }
-        }
-
         // look for a non-suspect keyspace to compact with, starting with 
where we left off last time,
         // and wrapping back to the beginning of the generation if necessary
-        Map<SSTableReader, Bounds<Token>> sstablesNextLevel = 
genBounds(getLevel(level + 1));
-        for (int i = 0; i < getLevel(level).size(); i++)
+        Map<SSTableReader, Bounds<Token>> sstablesNextLevel = 
genBounds(generations.get(level + 1));
+        Iterator<SSTableReader> levelIterator = 
generations.wrappingIterator(level, lastCompactedSSTables[level]);
+        while (levelIterator.hasNext())
         {
-            SSTableReader sstable = getLevel(level).get((start + i) % 
getLevel(level).size());
+            SSTableReader sstable = levelIterator.next();
             Set<SSTableReader> candidates = 
Sets.union(Collections.singleton(sstable), overlappingWithBounds(sstable, 
sstablesNextLevel));
-            if (Iterables.any(candidates, suspectP))
+
+            if (Iterables.any(candidates, SSTableReader::isMarkedSuspect))
                 continue;
             if (Sets.intersection(candidates, compacting).isEmpty())
                 return candidates;
@@ -734,10 +564,10 @@ public class LeveledManifest
         return Collections.emptyList();
     }
 
-    private Set<SSTableReader> getCompacting(int level)
+    private Set<SSTableReader> getCompactingL0()
     {
         Set<SSTableReader> sstables = new HashSet<>();
-        Set<SSTableReader> levelSSTables = new HashSet<>(getLevel(level));
+        Set<SSTableReader> levelSSTables = new HashSet<>(generations.get(0));
         for (SSTableReader sstable : cfs.getTracker().getCompacting())
         {
             if (levelSSTables.contains(sstable))
@@ -749,19 +579,14 @@ public class LeveledManifest
     @VisibleForTesting
     List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
     {
-        List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
-        Collections.sort(ageSortedCandidates, 
SSTableReader.maxTimestampAscending);
-        return ageSortedCandidates;
+        List<SSTableReader> copy = new ArrayList<>(candidates);
+        copy.sort(SSTableReader.maxTimestampAscending);
+        return ImmutableList.copyOf(copy);
     }
 
     public synchronized Set<SSTableReader>[] getSStablesPerLevelSnapshot()
     {
-        Set<SSTableReader>[] sstablesPerLevel = new Set[generations.length];
-        for (int i = 0; i < generations.length; i++)
-        {
-            sstablesPerLevel[i] = new HashSet<>(generations[i]);
-        }
-        return sstablesPerLevel;
+        return generations.snapshot();
     }
 
     @Override
@@ -770,34 +595,24 @@ public class LeveledManifest
         return "Manifest@" + hashCode();
     }
 
-    public int getLevelCount()
+    public synchronized int getLevelCount()
     {
-        for (int i = generations.length - 1; i >= 0; i--)
+        for (int i = generations.levelCount() - 1; i >= 0; i--)
         {
-            if (getLevel(i).size() > 0)
+            if (generations.get(i).size() > 0)
                 return i;
         }
         return 0;
     }
 
-    public synchronized SortedSet<SSTableReader> getLevelSorted(int level, 
Comparator<SSTableReader> comparator)
-    {
-        return ImmutableSortedSet.copyOf(comparator, getLevel(level));
-    }
-
-    public List<SSTableReader> getLevel(int i)
-    {
-        return generations[i];
-    }
-
     public synchronized int getEstimatedTasks()
     {
         long tasks = 0;
-        long[] estimated = new long[generations.length];
+        long[] estimated = new long[generations.levelCount()];
 
-        for (int i = generations.length - 1; i >= 0; i--)
+        for (int i = generations.levelCount() - 1; i >= 0; i--)
         {
-            List<SSTableReader> sstables = getLevel(i);
+            Set<SSTableReader> sstables = generations.get(i);
             // If there is 1 byte over TBL - (MBL * 1.001), there is still a 
task left, so we need to round up.
             estimated[i] = (long)Math.ceil((double)Math.max(0L, 
SSTableReader.getTotalBytes(sstables) - (long)(maxBytesForLevel(i, 
maxSSTableSizeInBytes) * 1.001)) / (double)maxSSTableSizeInBytes);
             tasks += estimated[i];
@@ -829,17 +644,18 @@ public class LeveledManifest
             assert newLevel > 0;
         }
         return newLevel;
+    }
 
+    synchronized Set<SSTableReader> getLevel(int level)
+    {
+        return ImmutableSet.copyOf(generations.get(level));
     }
 
-    public Iterable<SSTableReader> getAllSSTables()
+    synchronized List<SSTableReader> getLevelSorted(int level, 
Comparator<SSTableReader> comparator)
     {
-        Set<SSTableReader> sstables = new HashSet<>();
-        for (List<SSTableReader> generation : generations)
-        {
-            sstables.addAll(generation);
-        }
-        return sstables;
+        List<SSTableReader> copy = new ArrayList<>(generations.get(level));
+        copy.sort(comparator);
+        return ImmutableList.copyOf(copy);
     }
 
     public static class CompactionCandidate
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java 
b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 2643438..2a4f293 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -245,30 +245,19 @@ public class StandaloneScrubber
         if 
(strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class))
         {
             int maxSizeInMB = 
(int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 
1024L));
+            int fanOut = 
cfs.getCompactionStrategyManager().getLevelFanoutSize();
+            List<SSTableReader> repaired = new ArrayList<>();
+            List<SSTableReader> unrepaired = new ArrayList<>();
 
-            System.out.println("Checking leveled manifest");
-            Predicate<SSTableReader> repairedPredicate = new 
Predicate<SSTableReader>()
+            for (SSTableReader sstable : sstables)
             {
-                @Override
-                public boolean apply(SSTableReader sstable)
-                {
-                    return sstable.isRepaired();
-                }
-            };
-
-            List<SSTableReader> repaired = 
Lists.newArrayList(Iterables.filter(sstables, repairedPredicate));
-            List<SSTableReader> unRepaired = 
Lists.newArrayList(Iterables.filter(sstables, 
Predicates.not(repairedPredicate)));
-
-            LeveledManifest repairedManifest = LeveledManifest.create(cfs, 
maxSizeInMB, cfs.getLevelFanoutSize(), repaired);
-            for (int i = 1; i < repairedManifest.getLevelCount(); i++)
-            {
-                repairedManifest.repairOverlappingSSTables(i);
-            }
-            LeveledManifest unRepairedManifest = LeveledManifest.create(cfs, 
maxSizeInMB, cfs.getLevelFanoutSize(), unRepaired);
-            for (int i = 1; i < unRepairedManifest.getLevelCount(); i++)
-            {
-                unRepairedManifest.repairOverlappingSSTables(i);
+                if (sstable.isRepaired())
+                    repaired.add(sstable);
+                else
+                    unrepaired.add(sstable);
             }
+            LeveledManifest.create(cfs, maxSizeInMB, fanOut, repaired);
+            LeveledManifest.create(cfs, maxSizeInMB, fanOut, unrepaired);
         }
     }
 
diff --git 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyCQLTest.java
 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyCQLTest.java
new file mode 100644
index 0000000..9bfa380
--- /dev/null
+++ 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyCQLTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Hex;
+
+public class LongLeveledCompactionStrategyCQLTest extends CQLTester
+{
+
+    @Test
+    public void stressTestCompactionStrategyManager() throws 
ExecutionException, InterruptedException
+    {
+        System.setProperty(Config.PROPERTY_PREFIX + "test.strict_lcs_checks", 
"true");
+        // flush/compact tons of sstables, invalidate token metadata in a loop 
to make CSM reload the strategies
+        createTable("create table %s (id int primary key, i text) with 
compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1}");
+        ExecutorService es = Executors.newSingleThreadExecutor();
+        DatabaseDescriptor.setConcurrentCompactors(8);
+        AtomicBoolean stop = new AtomicBoolean(false);
+        long start = System.currentTimeMillis();
+        try
+        {
+            Random r = new Random();
+            Future<?> writes = es.submit(() -> {
+
+                byte[] b = new byte[1024];
+                while (!stop.get())
+                {
+
+                    for (int i = 0 ; i < 100; i++)
+                    {
+                        try
+                        {
+                            r.nextBytes(b);
+                            String s = Hex.bytesToHex(b);
+                            execute("insert into %s (id, i) values (?,?)", 
r.nextInt(), s);
+                        }
+                        catch (Throwable throwable)
+                        {
+                            throw new RuntimeException(throwable);
+                        }
+                    }
+                    getCurrentColumnFamilyStore().forceBlockingFlush();
+                    Uninterruptibles.sleepUninterruptibly(r.nextInt(200), 
TimeUnit.MILLISECONDS);
+                }
+            });
+
+            while(System.currentTimeMillis() - start < 
TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES))
+            {
+                
StorageService.instance.getTokenMetadata().invalidateCachedRings();
+                Uninterruptibles.sleepUninterruptibly(r.nextInt(1000), 
TimeUnit.MILLISECONDS);
+            }
+
+            stop.set(true);
+            writes.get();
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+}
diff --git 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 0d54173..ba3ad44 100644
--- 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -128,7 +128,7 @@ public class LongLeveledCompactionStrategyTest
         int levels = manifest.getLevelCount();
         for (int level = 0; level < levels; level++)
         {
-            List<SSTableReader> sstables = manifest.getLevel(level);
+            Set<SSTableReader> sstables = manifest.getLevel(level);
             // score check
             assert (double) SSTableReader.getTotalBytes(sstables) / 
manifest.maxBytesForLevel(level, 1 * 1024 * 1024) < 1.00;
             // overlap check for levels greater than 0
diff --git a/test/unit/org/apache/cassandra/MockSchema.java 
b/test/unit/org/apache/cassandra/MockSchema.java
index 804bccb..8208000 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -48,6 +48,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
 public class MockSchema
 {
     static
@@ -84,6 +86,21 @@ public class MockSchema
 
     public static SSTableReader sstable(int generation, int size, boolean 
keepRef, ColumnFamilyStore cfs)
     {
+        return sstable(generation, size, keepRef, generation, generation, cfs);
+    }
+
+    public static SSTableReader sstableWithLevel(int generation, long 
firstToken, long lastToken, int level, ColumnFamilyStore cfs)
+    {
+        return sstable(generation, 0, false, firstToken, lastToken, level, 
cfs);
+    }
+
+    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
+    {
+        return sstable(generation, size, keepRef, firstToken, lastToken, 0, 
cfs);
+    }
+
+    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
+    {
         Descriptor descriptor = new 
Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                cfs.keyspace.getName(),
                                                cfs.getColumnFamilyName(),
@@ -117,12 +134,14 @@ public class MockSchema
         }
         SerializationHeader header = SerializationHeader.make(cfs.metadata, 
Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata.comparator)
-                                                 
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 
0.01f, -1, header)
+                                                 .sstableLevel(level)
+                                                 
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 
0.01f, UNREPAIRED_SSTABLE, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, 
components, cfs.metadata,
                                                           
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), 
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
                                                           new 
AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
-        reader.first = reader.last = readerBounds(generation);
+        reader.first = readerBounds(firstToken);
+        reader.last = readerBounds(lastToken);
         if (!keepRef)
             reader.selfRef().release();
         return reader;
@@ -152,9 +171,9 @@ public class MockSchema
         return metadata;
     }
 
-    public static BufferDecoratedKey readerBounds(int generation)
+    public static BufferDecoratedKey readerBounds(long token)
     {
-        return new BufferDecoratedKey(new 
Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(token), 
ByteBufferUtil.EMPTY_BYTE_BUFFER);
     }
 
     private static File temp(String id)
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index b1d467e..1a3ac44 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -22,10 +22,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import junit.framework.Assert;
 import org.junit.After;
@@ -37,6 +41,7 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -157,7 +162,6 @@ public class LeveledCompactionStrategyTest
                 assert groupLevel == tableLevel;
             }
         }
-
     }
 
     /*
@@ -296,7 +300,7 @@ public class LeveledCompactionStrategyTest
             strategy.manifest.remove(s);
             s.descriptor.getMetadataSerializer().mutateLevel(s.descriptor, 6);
             s.reloadSSTableMetadata();
-            strategy.manifest.add(s);
+            strategy.manifest.addSSTables(Collections.singleton(s));
         }
         // verify that all sstables in the changed set is level 6
         for (SSTableReader s : cfs.getLiveSSTables())
@@ -345,14 +349,12 @@ public class LeveledCompactionStrategyTest
         for (SSTableReader sstable : cfs.getLiveSSTables())
             assertFalse(sstable.isRepaired());
 
-        int sstableCount = 0;
-        for (List<SSTableReader> level : unrepaired.manifest.generations)
-            sstableCount += level.size();
+        int sstableCount = unrepaired.manifest.getSSTables().size();
         // we only have unrepaired sstables:
         assertEquals(sstableCount, cfs.getLiveSSTables().size());
 
-        SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
-        SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
+        SSTableReader sstable1 = 
unrepaired.manifest.getLevel(2).iterator().next();
+        SSTableReader sstable2 = 
unrepaired.manifest.getLevel(1).iterator().next();
 
         
sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor,
 System.currentTimeMillis());
         sstable1.reloadSSTableMetadata();
@@ -360,14 +362,12 @@ public class LeveledCompactionStrategyTest
 
         manager.handleNotification(new 
SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
 
-        int repairedSSTableCount = 0;
-        for (List<SSTableReader> level : repaired.manifest.generations)
-            repairedSSTableCount += level.size();
+        int repairedSSTableCount = repaired.manifest.getSSTables().size();
         assertEquals(1, repairedSSTableCount);
         // make sure the repaired sstable ends up in the same level in the 
repaired manifest:
-        assertTrue(repaired.manifest.generations[2].contains(sstable1));
+        assertTrue(repaired.manifest.getLevel(2).contains(sstable1));
         // and that it is gone from unrepaired
-        assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
+        assertFalse(unrepaired.manifest.getLevel(2).contains(sstable1));
 
         unrepaired.removeSSTable(sstable2);
         manager.handleNotification(new 
SSTableAddedNotification(singleton(sstable2)), this);
@@ -480,4 +480,218 @@ public class LeveledCompactionStrategyTest
             lastMaxTimeStamp = sstable.getMaxTimestamp();
         }
     }
+
+    @Test
+    public void testAddingOverlapping()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
+        List<SSTableReader> currentLevel = new ArrayList<>();
+        int gen = 1;
+        currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs));
+        currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs));
+        currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs));
+        currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs));
+        currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs));
+
+        lm.addSSTables(currentLevel);
+        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+        assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5));
+
+        List<SSTableReader> newSSTables = new ArrayList<>();
+        // this sstable last token is the same as the first token of L1 above, 
should get sent to L0:
+        newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs));
+        lm.addSSTables(newSSTables);
+        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+        assertEquals(0, newSSTables.get(0).getSSTableLevel());
+        assertTrue(lm.getLevel(0).containsAll(newSSTables));
+
+        newSSTables.clear();
+        newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs));
+        lm.addSSTables(newSSTables);
+        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+        assertEquals(0, newSSTables.get(0).getSSTableLevel());
+        assertTrue(lm.getLevel(0).containsAll(newSSTables));
+
+        newSSTables.clear();
+        newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
+        lm.addSSTables(newSSTables);
+        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+        assertEquals(0, newSSTables.get(0).getSSTableLevel());
+        assertTrue(lm.getLevel(0).containsAll(newSSTables));
+
+        newSSTables.clear();
+        newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
+        newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs));
+        lm.addSSTables(newSSTables);
+        List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 
3));
+        newL1.add(newSSTables.get(1));
+        assertLevelsEqual(lm.getLevel(1), newL1);
+        newSSTables.remove(1);
+        assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 
0));
+        assertTrue(lm.getLevel(0).containsAll(newSSTables));
+    }
+
+    @Test
+    public void singleTokenSSTableTest()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
+        List<SSTableReader> expectedL1 = new ArrayList<>();
+
+        int gen = 1;
+        // single sstable, single token (100)
+        expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
+        lm.addSSTables(expectedL1);
+
+        List<SSTableReader> expectedL0 = new ArrayList<>();
+
+        // should get moved to L0:
+        expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs));
+        expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs));
+        expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs));
+        expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
+        lm.addSSTables(expectedL0);
+
+        assertLevelsEqual(expectedL0, lm.getLevel(0));
+        assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 
0));
+        assertLevelsEqual(expectedL1, lm.getLevel(1));
+        assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 
1));
+
+        // should work:
+        expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs));
+        expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs));
+        lm.addSSTables(expectedL1.subList(1, expectedL1.size()));
+        assertLevelsEqual(expectedL1, lm.getLevel(1));
+    }
+
+    @Test
+    public void randomMultiLevelAddTest()
+    {
+        int iterations = 100;
+        int levelCount = 8;
+
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
+        long seed = System.currentTimeMillis();
+        Random r = new Random(seed);
+        List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, 
levelCount, 0, r);
+
+        int sstableCount = newLevels.size();
+        lm.addSSTables(newLevels);
+
+        int [] expectedLevelSizes = lm.getAllLevelSize();
+
+        for (int j = 0; j < iterations; j++)
+        {
+            newLevels = generateNewRandomLevels(cfs, 20, levelCount, 
sstableCount, r);
+            sstableCount += newLevels.size();
+
+            int[] canAdd = canAdd(lm, newLevels, levelCount);
+            for (int i = 0; i < levelCount; i++)
+                expectedLevelSizes[i] += canAdd[i];
+            lm.addSSTables(newLevels);
+        }
+
+        // and verify no levels overlap
+        int actualSSTableCount = 0;
+        for (int i = 0; i < levelCount; i++)
+        {
+            actualSSTableCount += lm.getLevelSize(i);
+            List<SSTableReader> level = new ArrayList<>(lm.getLevel(i));
+            int lvl = i;
+            assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == 
lvl));
+            if (i > 0)
+            {
+                level.sort(SSTableReader.sstableComparator);
+                SSTableReader prev = null;
+                for (SSTableReader sstable : level)
+                {
+                    if (prev != null && sstable.first.compareTo(prev.last) <= 
0)
+                    {
+                        String levelStr = level.stream().map(s -> 
String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", "));
+                        String overlap = String.format("sstable [%s, %s] 
overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, 
prev.first, prev.last, i, levelStr);
+                        Assert.fail("[seed = "+seed+"] overlap in level 
"+lvl+": " + overlap);
+                    }
+                    prev = sstable;
+                }
+            }
+        }
+        assertEquals(sstableCount, actualSSTableCount);
+        for (int i = 0; i < levelCount; i++)
+            assertEquals("[seed = " + seed + "] wrong sstable count in level = 
" + i, expectedLevelSizes[i], lm.getLevel(i).size());
+    }
+
+    private static List<SSTableReader> 
generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int 
levelCount, int startGen, Random r)
+    {
+        List<SSTableReader> newLevels = new ArrayList<>();
+        for (int level = 0; level < levelCount; level++)
+        {
+            int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1;
+            List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2);
+
+            for (int i = 0; i < numLevelSSTables * 2; i++)
+                tokens.add(r.nextInt(4000));
+            Collections.sort(tokens);
+            for (int i = 0; i < tokens.size() - 1; i += 2)
+            {
+                SSTableReader sstable = 
MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), 
level, cfs);
+                newLevels.add(sstable);
+            }
+        }
+        return newLevels;
+    }
+
+    /**
+     * brute-force checks if the new sstables can be added to the correct 
level in manifest
+     *
+     * @return count of expected sstables to add to each level
+     */
+    private static int[] canAdd(LeveledManifest lm, List<SSTableReader> 
newSSTables, int levelCount)
+    {
+        Map<Integer, Collection<SSTableReader>> sstableGroups = new 
HashMap<>();
+        newSSTables.forEach(s -> 
sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new 
ArrayList<>()).add(s));
+
+        int[] canAdd = new int[levelCount];
+        for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : 
sstableGroups.entrySet())
+        {
+            int level = lvlGroup.getKey();
+            if (level == 0)
+            {
+                canAdd[0] += lvlGroup.getValue().size();
+                continue;
+            }
+
+            List<SSTableReader> newLevel = new ArrayList<>(lm.getLevel(level));
+            for (SSTableReader sstable : lvlGroup.getValue())
+            {
+                newLevel.add(sstable);
+                newLevel.sort(SSTableReader.sstableComparator);
+
+                SSTableReader prev = null;
+                boolean kept = true;
+                for (SSTableReader sst : newLevel)
+                {
+                    if (prev != null && prev.last.compareTo(sst.first) >= 0)
+                    {
+                        newLevel.remove(sstable);
+                        kept = false;
+                        break;
+                    }
+                    prev = sst;
+                }
+                if (kept)
+                    canAdd[level] += 1;
+                else
+                    canAdd[0] += 1;
+            }
+        }
+        return canAdd;
+    }
+
+    private static void assertLevelsEqual(Collection<SSTableReader> l1, 
Collection<SSTableReader> l2)
+    {
+        assertEquals(l1.size(), l2.size());
+        assertEquals(new HashSet<>(l1), new HashSet<>(l2));
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java
new file mode 100644
index 0000000..1f17bf8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class LeveledGenerationsTest extends CQLTester
+{
+    @BeforeClass
+    public static void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        MockSchema.cleanup();
+    }
+
+    @Test
+    public void testWrappingIterable()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+
+        LeveledGenerations gens = new LeveledGenerations();
+
+        for (int i = 0; i < 10; i++)
+        {
+            SSTableReader sstable = MockSchema.sstable(i, 5, true, i, i, 2, 
cfs);
+            gens.addAll(Collections.singleton(sstable));
+        }
+        int gen = 10;
+        assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 5)),
+                   6, 5, 10);
+        assertIter(gens.wrappingIterator(2, null),
+                   0, 9, 10);
+        assertIter(gens.wrappingIterator(2, sst(++gen, cfs, -10, 0)),
+                   1, 0, 10);
+        assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 9)),
+                   0, 9, 10);
+        assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 0, 1000)),
+                   0, 9, 10);
+
+        gens.addAll(Collections.singleton(MockSchema.sstable(100, 5, true, 5, 
10, 3, cfs)));
+        assertIter(gens.wrappingIterator(3, sst(++gen, cfs, -10, 0)),
+                   5, 5, 1);
+        assertIter(gens.wrappingIterator(3, sst(++gen, cfs, 0, 100)),
+                   5, 5, 1);
+
+        gens.addAll(Collections.singleton(MockSchema.sstable(200, 5, true, 5, 
10, 4, cfs)));
+        gens.addAll(Collections.singleton(MockSchema.sstable(201, 5, true, 40, 
50, 4, cfs)));
+        assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 0)),
+                   5, 40, 2);
+        assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 5)),
+                   40, 5, 2);
+        assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 7, 8)),
+                   40, 5, 2);
+        assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 39, 39)),
+                   40, 5, 2);
+        assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 40, 40)),
+                   5, 40, 2);
+        assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 100, 1000)),
+                   5, 40, 2);
+    }
+
+    @Test
+    public void testWrappingIterableWiderSSTables()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LeveledGenerations generations = new LeveledGenerations();
+        int gen = 0;
+        generations.addAll(Lists.newArrayList(
+            sst(++gen, cfs, 0, 50),
+            sst(++gen, cfs, 51, 100),
+            sst(++gen, cfs, 150, 200)));
+
+        assertIter(generations.wrappingIterator(2, sst(++gen, cfs, -100, -50)),
+                   0, 150, 3);
+
+        assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 40)),
+                   51, 0, 3);
+        assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 50)),
+                   51, 0, 3);
+        assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 51)),
+                   150, 51, 3);
+
+        assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 149)),
+                   150, 51, 3);
+        assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 300)),
+                   0, 150, 3);
+
+    }
+
+    @Test
+    public void testEmptyLevel()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LeveledGenerations generations = new LeveledGenerations();
+        assertFalse(generations.wrappingIterator(3, sst(0, cfs, 0, 
10)).hasNext());
+        assertFalse(generations.wrappingIterator(3, null).hasNext());
+    }
+
+    @Test
+    public void testFillLevels()
+    {
+        LeveledGenerations generations = new LeveledGenerations();
+
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        for (int i = 0; i < LeveledGenerations.MAX_LEVEL_COUNT; i++)
+            
generations.addAll(Collections.singleton(MockSchema.sstableWithLevel(i, i, i, 
i, cfs)));
+
+        for (int i = 0; i < generations.levelCount(); i++)
+            assertEquals(i, 
generations.get(i).iterator().next().getSSTableLevel());
+
+        assertEquals(9, generations.levelCount());
+
+        try
+        {
+            generations.get(9);
+            fail("don't have 9 generations");
+        }
+        catch (ArrayIndexOutOfBoundsException e)
+        {}
+        try
+        {
+            generations.get(-1);
+            fail("don't have -1 generations");
+        }
+        catch (ArrayIndexOutOfBoundsException e)
+        {}
+    }
+
+    private void assertIter(Iterator<SSTableReader> iter, long first, long 
last, int expectedCount)
+    {
+        List<SSTableReader> drained = Lists.newArrayList(iter);
+        assertEquals(expectedCount, drained.size());
+        assertEquals(dk(first).getToken(), first(drained).first.getToken());
+        assertEquals(dk(last).getToken(), last(drained).first.getToken()); // 
we sort by first token, so this is the first token of the last sstable in iter
+    }
+
+    private SSTableReader last(Iterable<SSTableReader> iter)
+    {
+        return Iterables.getLast(iter);
+    }
+    private SSTableReader first(Iterable<SSTableReader> iter)
+    {
+        SSTableReader first = Iterables.getFirst(iter, null);
+        if (first == null)
+            throw new RuntimeException();
+        return first;
+    }
+
+    private DecoratedKey dk(long x)
+    {
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(x), 
ByteBufferUtil.bytes(x));
+    }
+    private SSTableReader sst(int gen, ColumnFamilyStore cfs, long first, long 
last)
+    {
+        return MockSchema.sstable(gen, 5, true, first, last, 2, cfs);
+    }
+
+    private void print(SSTableReader sstable)
+    {
+        System.out.println(String.format("%d %s %s %d", 
sstable.descriptor.generation, sstable.first, sstable.last, 
sstable.getSSTableLevel()));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to