This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fa7659253f786284d33f840199dc26602d03363a
Merge: 91bcbb2 f41ea9f
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Sep 16 11:05:53 2020 +0200

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   2 +
 NEWS.txt                                           |   2 +
 .../db/compaction/AbstractCompactionStrategy.java  |  21 +-
 .../db/compaction/AbstractStrategyHolder.java      |   6 +-
 .../db/compaction/CompactionStrategyManager.java   | 206 +++++-------
 .../db/compaction/LeveledCompactionStrategy.java   |  29 +-
 .../db/compaction/LeveledGenerations.java          | 310 ++++++++++++++++++
 .../cassandra/db/compaction/LeveledManifest.java   | 354 +++++----------------
 .../apache/cassandra/tools/StandaloneScrubber.java |  26 +-
 .../LongLeveledCompactionStrategyCQLTest.java      |  92 ++++++
 .../LongLeveledCompactionStrategyTest.java         |   2 +-
 .../compaction/LeveledCompactionStrategyTest.java  | 235 +++++++++++++-
 .../db/compaction/LeveledGenerationsTest.java      | 199 ++++++++++++
 .../org/apache/cassandra/schema/MockSchema.java    |  11 +
 14 files changed, 1043 insertions(+), 452 deletions(-)

diff --cc CHANGES.txt
index 5d8a9fb,117fa96..3c4a810
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,40 -1,9 +1,42 @@@
 -3.11.9
 +4.0-beta3
 + * Allow zero padding in timestamp serialization (CASSANDRA-16105)
 + * Add byte array backed cells (CASSANDRA-15393)
 + * Correctly handle pending ranges with adjacent range movements 
(CASSANDRA-14801)
 + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
 + * Add nodetool getfullquerylog (CASSANDRA-15988)
 + * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
 + * Avoid trying to keep track of RTs for endpoints we won't write to during 
read repair (CASSANDRA-16084)
 + * When compaction gets interrupted, the exception should include the 
compactionId (CASSANDRA-15954)
 + * Make Table/Keyspace Metric Names Consistent With Each Other 
(CASSANDRA-15909)
 + * Mutating sstable component may race with entire-sstable-streaming(ZCS) 
causing checksum validation failure (CASSANDRA-15861)
++Merged from 3.11:
+  * Make sure LCS handles duplicate sstable added/removed notifications 
correctly (CASSANDRA-14103)
  
 -3.11.8
 +4.0-beta2
 + * Add addition incremental repair visibility to nodetool repair_admin 
(CASSANDRA-14939)
 + * Always access system properties and environment variables via the new 
CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
 + * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
 + * Prevent repair from overrunning compaction (CASSANDRA-15817)
 + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
 + * Strip comment blocks from cqlsh input before processing statements 
(CASSANDRA-15802)
 + * Fix unicode chars error input (CASSANDRA-15990)
 + * Improved testability for CacheMetrics and ChunkCacheMetrics 
(CASSANDRA-15788)
 + * Handle errors in StreamSession#prepare (CASSANDRA-15852)
 + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
 + * Remove COMPACT STORAGE internals (CASSANDRA-13994)
 + * Make TimestampSerializer accept fractional seconds of varying precision 
(CASSANDRA-15976)
 + * Improve cassandra-stress logging when using a profile file that doesn't 
exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy 
(CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 + * Resolve JMX output inconsistencies from CASSANDRA-7544 
storage-port-configurable-per-node (CASSANDRA-15937)
 +Merged from 3.11:
   * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in 
megabytes not bytes (CASSANDRA-16071)
   * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
  Merged from 3.0:
   * Fix gossip shutdown order (CASSANDRA-15816)
diff --cc NEWS.txt
index 7f20a7d,b3246b5..4b5264f
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -33,244 -42,21 +33,246 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 -3.11.9
 -======
 +4.0
 +===
 +
 +New features
 +------------
 +    - Nodes will now bootstrap all intra-cluster connections at startup by 
default and wait
 +      10 seconds for the all but one node in the local data center to be 
connected and marked
 +      UP in gossip. This prevents nodes from coordinating requests and 
failing because they
 +      aren't able to connect to the cluster fast enough. 
block_for_peers_timeout_in_secs in
 +      cassandra.yaml can be used to configure how long to wait (or whether to 
wait at all)
 +      and block_for_peers_in_remote_dcs can be used to also block on all but 
one node in
 +      each remote DC as well. See CASSANDRA-14297 and CASSANDRA-13993 for 
more information.
 +    - *Experimental* support for Transient Replication and Cheap Quorums 
introduced by CASSANDRA-14404
 +      The intended audience for this functionality is expert users of 
Cassandra who are prepared
 +      to validate every aspect of the database for their application and 
deployment practices. Future
 +      releases of Cassandra will make this feature suitable for a wider 
audience.
 +    - *Experimental* support for Java 11 has been added. JVM options that 
differ between or are
 +      specific for Java 8 and 11 have been moved from jvm.options into 
jvm8.options and jvm11.options.
 +      IMPORTANT: Running C* on Java 11 is *experimental* and do it at your 
own risk.
 +    - LCS now respects the max_threshold parameter when compacting - this was 
hard coded to 32
 +      before, but now it is possible to do bigger compactions when compacting 
from L0 to L1.
 +      This also applies to STCS-compactions in L0 - if there are more than 32 
sstables in L0
 +      we will compact at most max_threshold sstables in an L0 STCS 
compaction. See CASSANDRA-14388
 +      for more information.
 +    - There is now an option to automatically upgrade sstables after 
Cassandra upgrade, enable
 +      either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during 
runtime. See
 +      CASSANDRA-14197.
 +    - `nodetool refresh` has been deprecated in favour of `nodetool import` - 
see CASSANDRA-6719
 +      for details
 +    - An experimental option to compare all merkle trees together has been 
added - for example, in
 +      a 3 node cluster with 2 replicas identical and 1 out-of-date, with this 
option enabled, the
 +      out-of-date replica will only stream a single copy from up-to-date 
replica. Enable it by adding
 +      "-os" to nodetool repair. See CASSANDRA-3200.
 +    - The currentTimestamp, currentDate, currentTime and currentTimeUUID 
functions have been added.
 +      See CASSANDRA-13132
 +    - Support for arithmetic operations between `timestamp`/`date` and 
`duration` has been added.
 +      See CASSANDRA-11936
 +    - Support for arithmetic operations on number has been added. See 
CASSANDRA-11935
 +    - Preview expected streaming required for a repair (nodetool repair 
--preview), and validate the
 +      consistency of repaired data between nodes (nodetool repair 
--validate). See CASSANDRA-13257
 +    - Support for selecting Map values and Set elements has been added for 
SELECT queries. See CASSANDRA-7396
 +    - Change-Data-Capture has been modified to make CommitLogSegments 
available
 +      immediately upon creation via hard-linking the files. This means that 
incomplete
 +      segments will be available in cdc_raw rather than fully flushed. See 
documentation
 +      and CASSANDRA-12148 for more detail.
 +    - The initial build of materialized views can be parallelized. The number 
of concurrent builder
 +      threads is specified by the property 
`cassandra.yaml:concurrent_materialized_view_builders`.
 +      This property can be modified at runtime through both JMX and the new 
`setconcurrentviewbuilders`
 +      and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 
for more details.
 +    - There is now a binary full query log based on Chronicle Queue that can 
be controlled using
 +      nodetool enablefullquerylog, disablefullquerylog, and 
resetfullquerylog. The log
 +      contains all queries invoked, approximate time they were invoked, any 
parameters necessary
 +      to bind wildcard values, and all query options. A human readable 
version of the log can be
 +      dumped or tailed using the new bin/fqltool utility. The full query log 
is designed to be safe
 +      to use in production and limits utilization of heap memory and disk 
space with limits
 +      you can specify when enabling the log.
 +      See nodetool and fqltool help text for more information.
 +    - SSTableDump now supports the -l option to output each partition as it's 
own json object
 +      See CASSANDRA-13848 for more detail
 +    - Metric for coordinator writes per table has been added. See 
CASSANDRA-14232
 +    - Nodetool cfstats now has options to sort by various metrics as well as 
limit results.
 +    - Operators can restrict login user activity to one or more datacenters. 
See `network_authorizer`
 +      in cassandra.yaml, and the docs for create and alter role statements. 
CASSANDRA-13985
 +    - Roles altered from login=true to login=false will prevent existing 
connections from executing any
 +      statements after the cache has been refreshed. CASSANDRA-13985
 +    - Support for audit logging of database activity. If enabled, logs every 
incoming
 +      CQL command request, Authentication (successful as well as unsuccessful 
login) to a node.
 +    - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, 
Cassandra will use stream
 +      entire SSTables, significantly speeding up transfers. Any streaming 
related operations will see
 +      corresponding improvement. See CASSANDRA-14556.
 +    - NetworkTopologyStrategy now supports auto-expanding the 
replication_factor
 +      option into all available datacenters at CREATE or ALTER time. For 
example,
 +      specifying replication_factor: 3 translates to three replicas in every
 +      datacenter. This auto-expansion will _only add_ datacenters for safety.
 +      See CASSANDRA-14303 for more details.
 +    - Added Python 3 support so cqlsh and cqlshlib is now compatible with 
Python 2.7 and Python 3.6.
 +      Added --python option to cqlsh so users can specify the path to their 
chosen Python interpreter.
 +      See CASSANDRA-10190 for details.
 +    - Support for server side DESCRIBE statements has been added. See 
CASSANDRA-14825
 +
  Upgrading
  ---------
 -   - Custom compaction strategies must handle getting sstables added/removed 
notifications for
 -     sstables already added/removed - see CASSANDRA-14103 for details. This 
has been a requirement
 -     for correct operation since 3.11.0 due to an issue in 
CompactionStrategyManager.
 +    - Sstables for tables using with a frozen UDT written by C* 3.0 appear as 
corrupted.
  
 -3.11.7
 -======
 +      Background: The serialization-header in the -Statistics.db sstable 
component contains the type information
 +      of the table columns. C* 3.0 write incorrect type information for 
frozen UDTs by omitting the
 +      "frozen" information. Non-frozen UDTs were introduced by CASSANDRA-7423 
in C* 3.6. Since then, the missing
 +      "frozen" information leads to deserialization issues that result in 
CorruptSSTableExceptions, potentially other
 +      exceptions as well.
  
 -Upgrading
 ----------
 -    - Nothing specific to this release, but please see previous upgrading 
sections,
 -      especially if you are upgrading from 3.0.
 +      As a mitigation, the sstable serialization-headers are rewritten to 
contain the missing "frozen" information for
 +      UDTs once, when an upgrade from C* 3.0 is detected. This migration does 
not touch snapshots or backups.
 +
 +      The sstablescrub tool now performs a check of the sstable 
serialization-header against the schema. A mismatch of
 +      the types in the serialization-header and the schema will cause 
sstablescrub to error out and stop by default.
 +      See the new `-e` option. `-e off` disables the new validation code. `-e 
fix` or `-e fix-only`, e.g.
 +      `sstablescrub -e fix keyspace table`, will validate the 
serialization-header, rewrite the non-frozen UDTs
 +      in the serialzation-header to frozen UDTs, if that matches the schema, 
and continue with scrub.
 +      See `sstablescrub -h`.
 +      (CASSANDRA-15035)
 +    - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd 
tables from
 +      64kb to 16kb. For highly compressible data this can have a noticeable 
impact
 +      on space utilization. You may want to consider manually specifying this 
value.
 +    - Additional columns have been added to system_distributed.repair_history,
 +      system_traces.sessions and system_traces.events. As a result select 
queries
 +      against these tables - including queries against tracing tables 
performed
 +      automatically by the drivers and cqlsh - will fail and generate an 
error in the log
 +      during upgrade when the cluster is mixed version. On 3.x side this will 
also lead
 +      to broken internode connections and lost messages.
 +      Cassandra versions 3.0.20 and 3.11.6 pre-add these columns (see 
CASSANDRA-15385),
 +      so please make sure to upgrade to those versions or higher before 
upgrading to
 +      4.0 for query tracing to not cause any issues during the upgrade to 4.0.
 +    - Timestamp ties between values resolve differently: if either value has 
a TTL,
 +      this value always wins. This is to provide consistent reconciliation 
before
 +      and after the value expires into a tombstone.
 +    - Cassandra 4.0 removed support for COMPACT STORAGE tables. All Compact 
Tables
 +      have to be migrated using `ALTER ... DROP COMPACT STORAGE` statement in 
3.0/3.11.
 +      Cassandra starting 4.0 will not start if flags indicate that the table 
is non-CQL.
 +      Syntax for creating compact tables is also deprecated.
 +    - Support for legacy auth tables in the system_auth keyspace (users,
 +      permissions, credentials) and the migration code has been removed. 
Migration
 +      of these legacy auth tables must have been completed before the upgrade 
to
 +      4.0 and the legacy tables must have been removed. See the 'Upgrading' 
section
 +      for version 2.2 for migration instructions.
 +    - Cassandra 4.0 removed support for the deprecated Thrift interface. 
Amongst
 +      other things, this implies the removal of all yaml options related to 
thrift
 +      ('start_rpc', rpc_port, ...).
 +    - Cassandra 4.0 removed support for any pre-3.0 format. This means you
 +      cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade 
to
 +      a 3.0.x/3.x version first (and run upgradesstable). In particular, this
 +      mean Cassandra 4.0 cannot load or read pre-3.0 sstables in any way: you
 +      will need to upgrade those sstable in 3.0.x/3.x first.
 +    - Upgrades from 3.0.x or 3.x are supported since 3.0.13 or 3.11.0, 
previous
 +      versions will causes issues during rolling upgrades (CASSANDRA-13274).
 +    - Cassandra will no longer allow invalid keyspace replication options, 
such
 +      as invalid datacenter names for NetworkTopologyStrategy. Operators MUST
 +      add new nodes to a datacenter before they can set set ALTER or CREATE
 +      keyspace replication policies using that datacenter. Existing keyspaces
 +      will continue to operate, but CREATE and ALTER will validate that all
 +      datacenters specified exist in the cluster.
 +    - Cassandra 4.0 fixes a problem with incremental repair which caused 
repaired
 +      data to be inconsistent between nodes. The fix changes the behavior of 
both
 +      full and incremental repairs. For full repairs, data is no longer marked
 +      repaired. For incremental repairs, anticompaction is run at the 
beginning
 +      of the repair, instead of at the end. If incremental repair was being 
used
 +      prior to upgrading, a full repair should be run after upgrading to 
resolve
 +      any inconsistencies.
 +    - Config option index_interval has been removed (it was deprecated since 
2.0)
 +    - Deprecated repair JMX APIs are removed.
 +    - The version of snappy-java has been upgraded to 1.1.2.6
 +    - the miniumum value for internode message timeouts is 10ms. Previously, 
any
 +      positive value was allowed. See cassandra.yaml entries like
 +      read_request_timeout_in_ms for more details.
 +    - Cassandra 4.0 allows a single port to be used for both secure and 
insecure
 +      connections between cassandra nodes (CASSANDRA-10404). See the yaml for
 +      specific property changes, and see the security doc for full details.
 +    - Due to the parallelization of the initial build of materialized views,
 +      the per token range view building status is stored in the new table
 +      `system.view_builds_in_progress`. The old table 
`system.views_builds_in_progress`
 +      is no longer used and can be removed. See CASSANDRA-12245 for more 
details.
 +    - Config option commitlog_sync_batch_window_in_ms has been deprecated as 
it's
 +      documentation has been incorrect and the setting itself near useless.
 +      Batch mode remains a valid commit log mode, however.
 +    - There is a new commit log mode, group, which is similar to batch mode
 +      but blocks for up to a configurable number of milliseconds between disk 
flushes.
 +    - nodetool clearsnapshot now required the --all flag to remove all 
snapshots.
 +      Previous behavior would delete all snapshots by default.
 +    - Nodes are now identified by a combination of IP, and storage port.
 +      Existing JMX APIs, nodetool, and system tables continue to work
 +      and accept/return just an IP, but there is a new
 +      version of each that works with the full unambiguous identifier.
 +      You should prefer these over the deprecated ambiguous versions that only
 +      work with an IP. This was done to support multiple instances per IP.
 +      Additionally we are moving to only using a single port for encrypted and
 +      unencrypted traffic and if you want multiple instances per IP you must
 +      first switch encrypted traffic to the storage port and not a separate
 +      encrypted port. If you want to use multiple instances per IP
 +      with SSL you will need to use StartTLS on storage_port and set
 +      outgoing_encrypted_port_source to gossip outbound connections
 +      know what port to connect to for each instance. Before changing
 +      storage port or native port at nodes you must first upgrade the entire 
cluster
 +      and clients to 4.0 so they can handle the port not being consistent 
across
 +      the cluster.
 +    - Names of AWS regions/availability zones have been cleaned up to more 
correctly
 +      match the Amazon names. There is now a new option in 
conf/cassandra-rackdc.properties
 +      that lets users enable the correct names for new clusters, or use the 
legacy
 +      names for existing clusters. See conf/cassandra-rackdc.properties for 
details.
 +    - Background repair has been removed. dclocal_read_repair_chance and
 +      read_repair_chance table options have been removed and are now rejected.
 +      See CASSANDRA-13910 for details.
 +    - Internode TCP connections that do not ack segments for 30s will now
 +      be automatically detected and closed via the Linux TCP_USER_TIMEOUT
 +      socket option. This should be exceedingly rare, but AWS networks (and
 +      other stateful firewalls) apparently suffer from this issue. You can
 +      tune the timeouts on TCP connection and segment ack via the
 +      `cassandra.yaml:internode_tcp_connect_timeout_in_ms` and
 +      `cassandra.yaml:internode_tcp_user_timeout_in_ms` options respectively.
 +      See CASSANDRA-14358 for details.
 +    - repair_session_space_in_mb setting has been added to cassandra.yaml to 
allow operators to reduce
 +      merkle tree size if repair is creating too much heap pressure. The 
repair_session_max_tree_depth
 +      setting added in 3.0.19 and 3.11.5 is deprecated in favor of this 
setting. See CASSANDRA-14096
 +    - The flags 'enable_materialized_views' and 'enable_sasi_indexes' in 
cassandra.yaml
 +      have been set as false by default. Operators should modify them to 
allow the
 +      creation of new views and SASI indexes, the existing ones will continue 
working.
 +      See CASSANDRA-14866 for details.
 +    - CASSANDRA-15216 - The flag 'cross_node_timeout' has been set as true by 
default.
 +      This change is done under the assumption that users have setup NTP on
 +      their clusters or otherwise synchronize their clocks, and that clocks 
are
 +      mostly in sync, since this is a requirement for general correctness of
 +      last write wins.
 +    - CASSANDRA-15257 removed the joda time dependency.  Any time formats
 +      passed will now need to conform to java.time.format.DateTimeFormatter.
 +      Most notably, days and months must be two digits, and years exceeding
 +      four digits need to be prefixed with a plus or minus sign.
 +    - cqlsh now returns a non-zero code in case of errors. This is a backward 
incompatible change so it may
 +      break existing scripts that rely on the current behavior. See 
CASSANDRA-15623 for more details.
 +    - Updated the default compaction_throughput_mb_per_sec to to 64. The 
original
 +      default (16) was meant for spinning disk volumes.  See CASSANDRA-14902 
for details.
++    - Custom compaction strategies must now handle getting sstables 
added/removed notifications for
++      sstables already added/removed - see CASSANDRA-14103 for details.
 +
 +
 +Deprecation
 +-----------
 +
 +    - The JMX MBean org.apache.cassandra.db:type=BlacklistedDirectories has 
been
 +      deprecated in favor of 
org.apache.cassandra.db:type=DisallowedDirectories
 +      and will be removed in a subsequent major version.
 +
 +
 +Materialized Views
 +-------------------
 +    - Following a discussion regarding concerns about the design and safety 
of Materialized Views, the C* development
 +      community no longer recommends them for production use, and considers 
them experimental. Warnings messages will
 +      now be logged when they are created. (See 
https://www.mail-archive.com/dev@cassandra.apache.org/msg11511.html)
 +    - An 'enable_materialized_views' flag has been added to cassandra.yaml to 
allow operators to prevent creation of
 +      views
 +    - CREATE MATERIALIZED VIEW syntax has become stricter. Partition key 
columns are no longer implicitly considered
 +      to be NOT NULL, and no base primary key columns get automatically 
included in view definition. You have to
 +      specify them explicitly now.
  
  3.11.6
  ======
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index c764d6e,4298be8..0b37c22
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -273,33 -336,11 +284,39 @@@ public abstract class AbstractCompactio
              addSSTable(sstable);
      }
  
+     /**
+      * Removes sstable from the strategy, implementations must be able to 
handle the sstable having already been removed.
+      */
      public abstract void removeSSTable(SSTableReader sstable);
  
++    /**
++     * Removes sstables from the strategy, implementations must be able to 
handle the sstables having already been removed.
++     */
 +    public void removeSSTables(Iterable<SSTableReader> removed)
 +    {
 +        for (SSTableReader sstable : removed)
 +            removeSSTable(sstable);
 +    }
 +
 +    /**
 +     * Returns the sstables managed by this strategy instance
 +     */
 +    @VisibleForTesting
 +    protected abstract Set<SSTableReader> getSSTables();
 +
 +    /**
 +     * Called when the metadata has changed for an sstable - for example if 
the level changed
 +     *
 +     * Not called when repair status changes (which is also metadata), 
because this results in the
 +     * sstable getting removed from the compaction strategy instance.
 +     *
 +     * @param oldMetadata
 +     * @param sstable
 +     */
 +    public void metadataChanged(StatsMetadata oldMetadata, SSTableReader 
sstable)
 +    {
 +    }
 +
      public static class ScannerList implements AutoCloseable
      {
          public final List<ISSTableScanner> scanners;
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 63b7909,0000000..95fc7b8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@@ -1,210 -1,0 +1,210 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.function.Supplier;
 +
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.schema.CompactionParams;
 +
 +/**
 + * Wrapper that's aware of how sstables are divided between separate 
strategies,
 + * and provides a standard interface to them
 + *
 + * not threadsafe, calls must be synchronized by caller
 + */
 +public abstract class AbstractStrategyHolder
 +{
 +    public static class TaskSupplier implements Comparable<TaskSupplier>
 +    {
 +        private final int numRemaining;
 +        private final Supplier<AbstractCompactionTask> supplier;
 +
 +        TaskSupplier(int numRemaining, Supplier<AbstractCompactionTask> 
supplier)
 +        {
 +            this.numRemaining = numRemaining;
 +            this.supplier = supplier;
 +        }
 +
 +        public AbstractCompactionTask getTask()
 +        {
 +            return supplier.get();
 +        }
 +
 +        public int compareTo(TaskSupplier o)
 +        {
 +            return o.numRemaining - numRemaining;
 +        }
 +    }
 +
 +    public static interface DestinationRouter
 +    {
 +        int getIndexForSSTable(SSTableReader sstable);
 +        int getIndexForSSTableDirectory(Descriptor descriptor);
 +    }
 +
 +    /**
 +     * Maps sstables to their token partition bucket
 +     */
-     static class GroupedSSTableContainer
++    public static class GroupedSSTableContainer
 +    {
 +        private final AbstractStrategyHolder holder;
 +        private final Set<SSTableReader>[] groups;
 +
 +        private GroupedSSTableContainer(AbstractStrategyHolder holder)
 +        {
 +            this.holder = holder;
 +            Preconditions.checkArgument(holder.numTokenPartitions > 0, 
"numTokenPartitions not set");
 +            groups = new Set[holder.numTokenPartitions];
 +        }
 +
 +        void add(SSTableReader sstable)
 +        {
 +            Preconditions.checkArgument(holder.managesSSTable(sstable), "this 
strategy holder doesn't manage %s", sstable);
 +            int idx = holder.router.getIndexForSSTable(sstable);
 +            Preconditions.checkState(idx >= 0 && idx < 
holder.numTokenPartitions, "Invalid sstable index (%s) for %s", idx, sstable);
 +            if (groups[idx] == null)
 +                groups[idx] = new HashSet<>();
 +            groups[idx].add(sstable);
 +        }
 +
-         int numGroups()
++        public int numGroups()
 +        {
 +            return groups.length;
 +        }
 +
-         Set<SSTableReader> getGroup(int i)
++        public Set<SSTableReader> getGroup(int i)
 +        {
 +            Preconditions.checkArgument(i >= 0 && i < groups.length);
 +            Set<SSTableReader> group = groups[i];
 +            return group != null ? group : Collections.emptySet();
 +        }
 +
 +        boolean isGroupEmpty(int i)
 +        {
 +            return getGroup(i).isEmpty();
 +        }
 +
 +        boolean isEmpty()
 +        {
 +            for (int i = 0; i < groups.length; i++)
 +                if (!isGroupEmpty(i))
 +                    return false;
 +            return true;
 +        }
 +    }
 +
 +    protected final ColumnFamilyStore cfs;
 +    final DestinationRouter router;
 +    private int numTokenPartitions = -1;
 +
 +    AbstractStrategyHolder(ColumnFamilyStore cfs, DestinationRouter router)
 +    {
 +        this.cfs = cfs;
 +        this.router = router;
 +    }
 +
 +    public abstract void startup();
 +
 +    public abstract void shutdown();
 +
 +    final void setStrategy(CompactionParams params, int numTokenPartitions)
 +    {
 +        Preconditions.checkArgument(numTokenPartitions > 0, "at least one 
token partition required");
 +        shutdown();
 +        this.numTokenPartitions = numTokenPartitions;
 +        setStrategyInternal(params, numTokenPartitions);
 +    }
 +
 +    protected abstract void setStrategyInternal(CompactionParams params, int 
numTokenPartitions);
 +
 +    /**
 +     * SSTables are grouped by their repaired and pending repair status. This 
method determines if this holder
 +     * holds the sstable for the given repaired/grouped statuses. Holders 
should be mutually exclusive in the
 +     * groups they deal with. IOW, if one holder returns true for a given 
isRepaired/isPendingRepair combo,
 +     * none of the others should.
 +     */
 +    public abstract boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient);
 +
 +    public boolean managesSSTable(SSTableReader sstable)
 +    {
 +        return managesRepairedGroup(sstable.isRepaired(), 
sstable.isPendingRepair(), sstable.isTransient());
 +    }
 +
 +    public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader 
sstable);
 +
 +    public abstract Iterable<AbstractCompactionStrategy> allStrategies();
 +
 +    public abstract Collection<TaskSupplier> getBackgroundTaskSuppliers(int 
gcBefore);
 +
 +    public abstract Collection<AbstractCompactionTask> getMaximalTasks(int 
gcBefore, boolean splitOutput);
 +
 +    public abstract Collection<AbstractCompactionTask> 
getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore);
 +
 +    public GroupedSSTableContainer createGroupedSSTableContainer()
 +    {
 +        return new GroupedSSTableContainer(this);
 +    }
 +
 +    public abstract void addSSTables(GroupedSSTableContainer sstables);
 +
 +    public abstract void removeSSTables(GroupedSSTableContainer sstables);
 +
 +    public abstract void replaceSSTables(GroupedSSTableContainer removed, 
GroupedSSTableContainer added);
 +
 +    public abstract List<ISSTableScanner> getScanners(GroupedSSTableContainer 
sstables, Collection<Range<Token>> ranges);
 +
 +
 +    public abstract SSTableMultiWriter createSSTableMultiWriter(Descriptor 
descriptor,
 +                                                                long keyCount,
 +                                                                long 
repairedAt,
 +                                                                UUID 
pendingRepair,
 +                                                                boolean 
isTransient,
 +                                                                
MetadataCollector collector,
 +                                                                
SerializationHeader header,
 +                                                                
Collection<Index> indexes,
 +                                                                
LifecycleNewTracker lifecycleNewTracker);
 +
 +    /**
 +     * Return the directory index the given compaction strategy belongs to, 
or -1
 +     * if it's not held by this holder
 +     */
 +    public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
 +
 +    public abstract boolean containsSSTable(SSTableReader sstable);
 +}
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 2e1fd0e,c80504c..f3c9a66
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -33,15 -23,13 +33,12 @@@ import java.util.UUID
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  import java.util.function.Supplier;
  import java.util.stream.Collectors;
 -import java.util.stream.Stream;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
  import com.google.common.collect.Iterables;
 -import com.google.common.primitives.Ints;
 -
 -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import com.google.common.collect.Lists;
 +import com.google.common.primitives.Longs;
- 
- import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
- import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -49,8 -37,8 +46,10 @@@ import org.apache.cassandra.config.Data
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.DiskBoundaries;
 -import org.apache.cassandra.db.Memtable;
  import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier;
++import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.dht.Range;
@@@ -135,10 -102,12 +134,12 @@@ public class CompactionStrategyManager 
  
          If a user changes the local compaction strategy and then later ALTERs 
a compaction parameter,
          we will use the new compaction parameters.
 -     **/
 +     */
      private volatile CompactionParams schemaCompactionParams;
-     private boolean supportsEarlyOpen;
-     private int fanout;
+     private volatile boolean supportsEarlyOpen;
+     private volatile int fanout;
+     private volatile long maxSSTableSizeBytes;
+     private volatile String name;
  
      public CompactionStrategyManager(ColumnFamilyStore cfs)
      {
@@@ -310,9 -215,12 +311,11 @@@
                  if (sstable.openReason != SSTableReader.OpenReason.EARLY)
                      compactionStrategyFor(sstable).addSSTable(sstable);
              }
 -            repaired.forEach(AbstractCompactionStrategy::startup);
 -            unrepaired.forEach(AbstractCompactionStrategy::startup);
 -            supportsEarlyOpen = repaired.get(0).supportsEarlyOpen();
 -            fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? 
((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : 
LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
 -            name = repaired.get(0).getName();
 -            maxSSTableSizeBytes = repaired.get(0).getMaxSSTableBytes();
 +            holders.forEach(AbstractStrategyHolder::startup);
 +            supportsEarlyOpen = repaired.first().supportsEarlyOpen();
 +            fanout = (repaired.first() instanceof LeveledCompactionStrategy) 
? ((LeveledCompactionStrategy) repaired.first()).getLevelFanoutSize() : 
LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
++            maxSSTableSizeBytes = repaired.first().getMaxSSTableBytes();
++            name = repaired.first().getName();
          }
          finally
          {
@@@ -362,10 -275,9 +365,9 @@@
       * @param sstable
       * @return
       */
-     @VisibleForTesting
      int compactionStrategyIndexFor(SSTableReader sstable)
      {
 -        // should not call maybeReload because it may be called from within 
lock
 +        // should not call maybeReloadDiskBoundaries because it may be called 
from within lock
          readLock.lock();
          try
          {
@@@ -568,10 -434,10 +569,10 @@@
          readLock.lock();
          try
          {
 -            if (repaired.get(0) instanceof LeveledCompactionStrategy && 
unrepaired.get(0) instanceof LeveledCompactionStrategy)
 +            if (repaired.first() instanceof LeveledCompactionStrategy)
              {
-                 int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+                 int[] res = new int[LeveledGenerations.MAX_LEVEL_COUNT];
 -                for (AbstractCompactionStrategy strategy : repaired)
 +                for (AbstractCompactionStrategy strategy : getAllStrategies())
                  {
                      int[] repairedCountPerLevel = 
((LeveledCompactionStrategy) strategy).getAllLevelSize();
                      res = sumArrays(res, repairedCountPerLevel);
@@@ -601,201 -472,135 +602,175 @@@
          return res;
      }
  
 -    public Directories getDirectories()
 -    {
 -        maybeReloadDiskBoundaries();
 -        readLock.lock();
 -        try
 -        {
 -            assert 
repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
 -            return repaired.get(0).getDirectories();
 -        }
 -        finally
 -        {
 -            readLock.unlock();
 -        }
 -    }
 -
+     /**
+      * Should only be called holding the readLock
+      */
      private void handleFlushNotification(Iterable<SSTableReader> added)
      {
-         // If reloaded, SSTables will be placed in their correct locations
-         // so there is no need to process notification
-         if (maybeReloadDiskBoundaries())
-             return;
- 
-         readLock.lock();
-         try
-         {
-             for (SSTableReader sstable : added)
-                 compactionStrategyFor(sstable).addSSTable(sstable);
-         }
-         finally
-         {
-             readLock.unlock();
-         }
+         for (SSTableReader sstable : added)
+             compactionStrategyFor(sstable).addSSTable(sstable);
      }
  
 -    /**
 -     * Should only be called holding the readLock
 -     */
 -    private void handleListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
 +    private int getHolderIndex(SSTableReader sstable)
      {
 -        // a bit of gymnastics to be able to replace sstables in compaction 
strategies
 -        // we use this to know that a compaction finished and where to start 
the next compaction in LCS
 -        int locationSize = partitionSSTablesByTokenRange? 
currentBoundaries.directories.size() : 1;
 +        for (int i = 0; i < holders.size(); i++)
 +        {
 +            if (holders.get(i).managesSSTable(sstable))
 +                return i;
 +        }
  
 -        List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
 -        List<Set<SSTableReader>> repairedAdded = new 
ArrayList<>(locationSize);
 -        List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
 -        List<Set<SSTableReader>> unrepairedAdded = new 
ArrayList<>(locationSize);
 +        throw new IllegalStateException("No holder claimed " + sstable);
 +    }
  
 -        for (int i = 0; i < locationSize; i++)
 +    private AbstractStrategyHolder getHolder(SSTableReader sstable)
 +    {
 +        for (AbstractStrategyHolder holder : holders)
          {
 -            repairedRemoved.add(new HashSet<>());
 -            repairedAdded.add(new HashSet<>());
 -            unrepairedRemoved.add(new HashSet<>());
 -            unrepairedAdded.add(new HashSet<>());
 +            if (holder.managesSSTable(sstable))
 +                return holder;
          }
  
 -        for (SSTableReader sstable : removed)
 +        throw new IllegalStateException("No holder claimed " + sstable);
 +    }
 +
 +    private AbstractStrategyHolder getHolder(long repairedAt, UUID 
pendingRepair, boolean isTransient)
 +    {
 +        return getHolder(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE,
 +                         pendingRepair != 
ActiveRepairService.NO_PENDING_REPAIR,
 +                         isTransient);
 +    }
 +
 +    @VisibleForTesting
 +    AbstractStrategyHolder getHolder(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient)
 +    {
 +        for (AbstractStrategyHolder holder : holders)
          {
 -            int i = compactionStrategyIndexFor(sstable);
 -            if (sstable.isRepaired())
 -                repairedRemoved.get(i).add(sstable);
 -            else
 -                unrepairedRemoved.get(i).add(sstable);
 +            if (holder.managesRepairedGroup(isRepaired, isPendingRepair, 
isTransient))
 +                return holder;
          }
 -        for (SSTableReader sstable : added)
 +
 +        throw new IllegalStateException(String.format("No holder claimed 
isPendingRepair: %s, isPendingRepair %s",
 +                                                      isRepaired, 
isPendingRepair));
 +    }
 +
 +    @VisibleForTesting
 +    ImmutableList<AbstractStrategyHolder> getHolders()
 +    {
 +        return holders;
 +    }
 +
 +    /**
 +     * Split sstables into a list of grouped sstable containers, the list 
index an sstable
 +     *
 +     * lives in matches the list index of the holder that's responsible for it
 +     */
-     @VisibleForTesting
-     List<GroupedSSTableContainer> groupSSTables(Iterable<SSTableReader> 
sstables)
++    public List<GroupedSSTableContainer> 
groupSSTables(Iterable<SSTableReader> sstables)
 +    {
 +        List<GroupedSSTableContainer> classified = new 
ArrayList<>(holders.size());
 +        for (AbstractStrategyHolder holder : holders)
          {
 -            int i = compactionStrategyIndexFor(sstable);
 -            if (sstable.isRepaired())
 -                repairedAdded.get(i).add(sstable);
 -            else
 -                unrepairedAdded.get(i).add(sstable);
 +            classified.add(holder.createGroupedSSTableContainer());
          }
 -        for (int i = 0; i < locationSize; i++)
 +
 +        for (SSTableReader sstable : sstables)
          {
 -            if (!repairedRemoved.get(i).isEmpty())
 -                repaired.get(i).replaceSSTables(repairedRemoved.get(i), 
repairedAdded.get(i));
 -            else
 -                repaired.get(i).addSSTables(repairedAdded.get(i));
 +            classified.get(getHolderIndex(sstable)).add(sstable);
 +        }
  
 -            if (!unrepairedRemoved.get(i).isEmpty())
 -                unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), 
unrepairedAdded.get(i));
 -            else
 -                unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
 +        return classified;
 +    }
 +
++    /**
++     * Should only be called holding the readLock
++     */
 +    private void handleListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
 +    {
-         // If reloaded, SSTables will be placed in their correct locations
-         // so there is no need to process notification
-         if (maybeReloadDiskBoundaries())
-             return;
- 
-         readLock.lock();
-         try
++        List<GroupedSSTableContainer> addedGroups = groupSSTables(added);
++        List<GroupedSSTableContainer> removedGroups = groupSSTables(removed);
++        for (int i=0; i<holders.size(); i++)
 +        {
-             List<GroupedSSTableContainer> addedGroups = groupSSTables(added);
-             List<GroupedSSTableContainer> removedGroups = 
groupSSTables(removed);
-             for (int i=0; i<holders.size(); i++)
-             {
-                 holders.get(i).replaceSSTables(removedGroups.get(i), 
addedGroups.get(i));
-             }
-         }
-         finally
-         {
-             readLock.unlock();
++            holders.get(i).replaceSSTables(removedGroups.get(i), 
addedGroups.get(i));
          }
      }
  
++    /**
++     * Should only be called holding the readLock
++     */
      private void 
handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
      {
-         // If reloaded, SSTables will be placed in their correct locations
-         // so there is no need to process notification
-         if (maybeReloadDiskBoundaries())
-             return;
-         // we need a write lock here since we move sstables from one strategy 
instance to another
-         readLock.lock();
-         try
 -        for (SSTableReader sstable : sstables)
++        List<GroupedSSTableContainer> groups = groupSSTables(sstables);
++        for (int i = 0; i < holders.size(); i++)
          {
-             List<GroupedSSTableContainer> groups = groupSSTables(sstables);
-             for (int i = 0; i < holders.size(); i++)
 -            int index = compactionStrategyIndexFor(sstable);
 -            if (sstable.isRepaired())
--            {
-                 GroupedSSTableContainer group = groups.get(i);
 -                unrepaired.get(index).removeSSTable(sstable);
 -                repaired.get(index).addSSTable(sstable);
 -            }
 -            else
++            GroupedSSTableContainer group = groups.get(i);
 +
-                 if (group.isEmpty())
-                     continue;
++            if (group.isEmpty())
++                continue;
 +
-                 AbstractStrategyHolder dstHolder = holders.get(i);
- 
-                 for (AbstractStrategyHolder holder : holders)
-                 {
-                     if (holder != dstHolder)
-                         holder.removeSSTables(group);
-                 }
- 
-                 // adding sstables into another strategy may change its level,
-                 // thus it won't be removed from original LCS. We have to 
remove sstables first
-                 dstHolder.addSSTables(group);
++            AbstractStrategyHolder dstHolder = holders.get(i);
++            for (AbstractStrategyHolder holder : holders)
+             {
 -                repaired.get(index).removeSSTable(sstable);
 -                unrepaired.get(index).addSSTable(sstable);
++                if (holder != dstHolder)
++                    holder.removeSSTables(group);
              }
-         }
-         finally
-         {
-             readLock.unlock();
++
++            // adding sstables into another strategy may change its level,
++            // thus it won't be removed from original LCS. We have to remove 
sstables first
++            dstHolder.addSSTables(group);
          }
 +    }
  
++    /**
++     * Should only be called holding the readLock
++     */
 +    private void handleMetadataChangedNotification(SSTableReader sstable, 
StatsMetadata oldMetadata)
 +    {
 +        AbstractCompactionStrategy acs = getCompactionStrategyFor(sstable);
 +        acs.metadataChanged(oldMetadata, sstable);
      }
  
++    /**
++     * Should only be called holding the readLock
++     */
      private void handleDeletingNotification(SSTableReader deleted)
      {
-         // If reloaded, SSTables will be placed in their correct locations
-         // so there is no need to process notification
-         if (maybeReloadDiskBoundaries())
-             return;
-         readLock.lock();
-         try
-         {
-             compactionStrategyFor(deleted).removeSSTable(deleted);
-         }
-         finally
-         {
-             readLock.unlock();
-         }
+         compactionStrategyFor(deleted).removeSSTable(deleted);
      }
  
      public void handleNotification(INotification notification, Object sender)
      {
-         if (notification instanceof SSTableAddedNotification)
-         {
-             handleFlushNotification(((SSTableAddedNotification) 
notification).added);
-         }
-         else if (notification instanceof SSTableListChangedNotification)
-         {
-             SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
-             handleListChangedNotification(listChangedNotification.added, 
listChangedNotification.removed);
-         }
-         else if (notification instanceof SSTableRepairStatusChanged)
-         {
-             
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) 
notification).sstables);
-         }
-         else if (notification instanceof SSTableDeletingNotification)
+         // we might race with reload adding/removing the sstables, this means 
that compaction strategies
+         // must handle double notifications.
+         maybeReloadDiskBoundaries();
+         readLock.lock();
+         try
          {
-             handleDeletingNotification(((SSTableDeletingNotification) 
notification).deleting);
++
+             if (notification instanceof SSTableAddedNotification)
+             {
+                 handleFlushNotification(((SSTableAddedNotification) 
notification).added);
+             }
+             else if (notification instanceof SSTableListChangedNotification)
+             {
+                 SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
+                 handleListChangedNotification(listChangedNotification.added, 
listChangedNotification.removed);
+             }
+             else if (notification instanceof SSTableRepairStatusChanged)
+             {
+                 
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) 
notification).sstables);
+             }
+             else if (notification instanceof SSTableDeletingNotification)
+             {
+                 handleDeletingNotification(((SSTableDeletingNotification) 
notification).deleting);
+             }
++            else if (notification instanceof SSTableMetadataChanged)
++            {
++                SSTableMetadataChanged lcNotification = 
(SSTableMetadataChanged) notification;
++                handleMetadataChangedNotification(lcNotification.sstable, 
lcNotification.oldMetadata);
++            }
          }
-         else if (notification instanceof SSTableMetadataChanged)
+         finally
          {
-             SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) 
notification;
-             handleMetadataChangedNotification(lcNotification.sstable, 
lcNotification.oldMetadata);
+             readLock.unlock();
          }
      }
  
@@@ -835,45 -649,47 +810,45 @@@
       * @return
       */
      @SuppressWarnings("resource")
 -    public AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> 
ranges)
 +    public AbstractCompactionStrategy.ScannerList 
maybeGetScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> 
ranges)
      {
          maybeReloadDiskBoundaries();
-         readLock.lock();
 +        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+         readLock.lock();
          try
          {
 -            assert repaired.size() == unrepaired.size();
 -            List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
 -            List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
 +            List<GroupedSSTableContainer> sstableGroups = 
groupSSTables(sstables);
  
 -            for (int i = 0; i < repaired.size(); i++)
 +            for (int i = 0; i < holders.size(); i++)
              {
 -                repairedSSTables.add(new HashSet<>());
 -                unrepairedSSTables.add(new HashSet<>());
 -            }
 -
 -            for (SSTableReader sstable : sstables)
 -            {
 -                if (sstable.isRepaired())
 -                    
repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
 -                else
 -                    
unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
 +                AbstractStrategyHolder holder = holders.get(i);
 +                GroupedSSTableContainer group = sstableGroups.get(i);
 +                scanners.addAll(holder.getScanners(group, ranges));
              }
 +        }
 +        catch (PendingRepairManager.IllegalSSTableArgumentException e)
 +        {
 +            ISSTableScanner.closeAllAndPropagate(scanners, new 
ConcurrentModificationException(e));
 +        }
 +        finally
 +        {
 +            readLock.unlock();
 +        }
 +        return new AbstractCompactionStrategy.ScannerList(scanners);
 +    }
  
 -            List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 -            for (int i = 0; i < repairedSSTables.size(); i++)
 +    public AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> 
ranges)
 +    {
 +        while (true)
 +        {
 +            try
              {
 -                if (!repairedSSTables.get(i).isEmpty())
 -                    
scanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), 
ranges).scanners);
 +                return maybeGetScanners(sstables, ranges);
              }
 -            for (int i = 0; i < unrepairedSSTables.size(); i++)
 +            catch (ConcurrentModificationException e)
              {
 -                if (!unrepairedSSTables.get(i).isEmpty())
 -                    
scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), 
ranges).scanners);
 +                logger.debug("SSTable repairedAt/pendingRepaired values 
changed while getting scanners");
              }
 -
 -            return new AbstractCompactionStrategy.ScannerList(scanners);
 -        }
 -        finally
 -        {
 -            readLock.unlock();
          }
      }
  
diff --cc 
src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 74ffccb,77cb223..dd7c9df
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -21,7 -21,7 +21,6 @@@ import java.util.*
  
  
  import com.google.common.annotations.VisibleForTesting;
--import com.google.common.base.Joiner;
  import com.google.common.collect.*;
  import com.google.common.primitives.Doubles;
  
@@@ -348,16 -340,15 +347,22 @@@ public class LeveledCompactionStrategy 
      }
  
      @Override
 +    public void metadataChanged(StatsMetadata oldMetadata, SSTableReader 
sstable)
 +    {
 +        if (sstable.getSSTableLevel() != oldMetadata.sstableLevel)
 +            manifest.newLevel(sstable, oldMetadata.sstableLevel);
 +    }
 +
 +    @Override
+     public void addSSTables(Iterable<SSTableReader> sstables)
+     {
+         manifest.addSSTables(sstables);
+     }
+ 
+     @Override
      public void addSSTable(SSTableReader added)
      {
-         manifest.add(added);
+         manifest.addSSTables(Collections.singleton(added));
      }
  
      @Override
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
index 0000000,f7087f0..bc83275
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@@ -1,0 -1,311 +1,310 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.io.IOException;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.Set;
+ import java.util.TreeSet;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.PeekingIterator;
+ import com.google.common.primitives.Ints;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ /**
+  * Handles the leveled manifest generations
+  *
+  * Not thread safe, all access should be synchronized in LeveledManifest
+  */
+ class LeveledGenerations
+ {
+     private static final Logger logger = 
LoggerFactory.getLogger(LeveledGenerations.class);
+     private final boolean strictLCSChecksTest = 
Boolean.getBoolean(Config.PROPERTY_PREFIX + "test.strict_lcs_checks");
+     // allocate enough generations for a PB of data, with a 1-MB sstable 
size.  (Note that if maxSSTableSize is
+     // updated, we will still have sstables of the older, potentially smaller 
size.  So don't make this
+     // dependent on maxSSTableSize.)
+     static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
+ 
+     private final Set<SSTableReader> l0 = new HashSet<>();
+     private static long lastOverlapCheck = System.nanoTime();
+     // note that since l0 is broken out, levels[0] represents L1:
+     private final TreeSet<SSTableReader> [] levels = new 
TreeSet[MAX_LEVEL_COUNT - 1];
+ 
+     private static final Comparator<SSTableReader> nonL0Comparator = (o1, o2) 
-> {
+         int cmp = SSTableReader.sstableComparator.compare(o1, o2);
+         if (cmp == 0)
+             cmp = Ints.compare(o1.descriptor.generation, 
o2.descriptor.generation);
+         return cmp;
+     };
+ 
+     LeveledGenerations()
+     {
+         for (int i = 0; i < MAX_LEVEL_COUNT - 1; i++)
+             levels[i] = new TreeSet<>(nonL0Comparator);
+     }
+ 
+     Set<SSTableReader> get(int level)
+     {
+         if (level > levelCount() - 1 || level < 0)
+             throw new ArrayIndexOutOfBoundsException("Invalid generation " + 
level + " - maximum is " + (levelCount() - 1));
+         if (level == 0)
+             return l0;
+         return levels[level - 1];
+     }
+ 
+     int levelCount()
+     {
+         return levels.length + 1;
+     }
+ 
+     /**
+      * Adds readers to the correct level
+      *
+      * If adding an sstable would cause an overlap in the level (if level > 
1) we send it to L0. This can happen
+      * for example when moving sstables from unrepaired to repaired.
+      *
+      * If the sstable is already in the manifest we skip it.
+      *
+      * If the sstable exists in the manifest but has the wrong level, it is 
removed from the wrong level and added to the correct one
+      *
+      * todo: group sstables per level, add all if level is currently empty, 
improve startup speed
+      */
+     void addAll(Iterable<SSTableReader> readers)
+     {
+         logDistribution();
+         for (SSTableReader sstable : readers)
+         {
+             assert sstable.getSSTableLevel() < levelCount() : "Invalid level 
" + sstable.getSSTableLevel() + " out of " + (levelCount() - 1);
+             int existingLevel = getLevelIfExists(sstable);
+             if (existingLevel != -1)
+             {
+                 if (sstable.getSSTableLevel() != existingLevel)
+                 {
+                     logger.error("SSTable {} on the wrong level in the 
manifest - {} instead of {} as recorded in the sstable metadata, removing from 
level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel);
+                     if (strictLCSChecksTest)
+                         throw new AssertionError("SSTable not in matching 
level in manifest: "+sstable + ": "+existingLevel+" != " + 
sstable.getSSTableLevel());
+                     get(existingLevel).remove(sstable);
+                 }
+                 else
+                 {
+                     logger.info("Manifest already contains {} in level {} - 
skipping", sstable, existingLevel);
+                     continue;
+                 }
+             }
+ 
+             if (sstable.getSSTableLevel() == 0)
+             {
+                 l0.add(sstable);
+                 continue;
+             }
+ 
+             TreeSet<SSTableReader> level = levels[sstable.getSSTableLevel() - 
1];
+             /*
+             current level: |-----||----||----|        |---||---|
+               new sstable:                      |--|
+                                           ^ before
+                                                         ^ after
+                 overlap if before.last >= newsstable.first or after.first <= 
newsstable.last
+              */
+             SSTableReader after = level.ceiling(sstable);
+             SSTableReader before = level.floor(sstable);
+ 
+             if (before != null && before.last.compareTo(sstable.first) >= 0 ||
+                 after != null && after.first.compareTo(sstable.last) <= 0)
+             {
+                 if (strictLCSChecksTest) // we can only assert this in tests 
since this is normal when for example moving sstables from unrepaired to 
repaired
+                     throw new AssertionError("Got unexpected overlap in level 
"+sstable.getSSTableLevel());
+                 sendToL0(sstable);
+             }
+             else
+             {
+                 level.add(sstable);
+             }
+         }
+         maybeVerifyLevels();
+     }
+ 
+     /**
+      * Sends sstable to L0 by mutating its level in the sstable metadata.
+      *
+      * SSTable should not exist in the manifest
+      */
+     private void sendToL0(SSTableReader sstable)
+     {
+         try
+         {
 -            
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
 -            sstable.reloadSSTableMetadata();
++            sstable.mutateLevelAndReload(0);
+         }
+         catch (IOException e)
+         {
+             // Adding it to L0 and marking suspect is probably the best we 
can do here - it won't create overlap
+             // and we won't pick it for later compactions.
+             logger.error("Failed mutating sstable metadata for {} - adding it 
to L0 to avoid overlap. Marking suspect", sstable, e);
+             sstable.markSuspect();
+         }
+         l0.add(sstable);
+     }
+ 
+     /**
+      * Tries to find the sstable in the levels without using the 
sstable-recorded level
+      *
+      * Used to make sure we don't try to re-add an existing sstable
+      */
+     private int getLevelIfExists(SSTableReader sstable)
+     {
+         for (int i = 0; i < levelCount(); i++)
+         {
+             if (get(i).contains(sstable))
+                 return i;
+         }
+         return -1;
+     }
+ 
+     int remove(Collection<SSTableReader> readers)
+     {
+         int minLevel = Integer.MAX_VALUE;
+         for (SSTableReader sstable : readers)
+         {
+             int level = sstable.getSSTableLevel();
+             minLevel = Math.min(minLevel, level);
+             get(level).remove(sstable);
+         }
+         return minLevel;
+     }
+ 
+     int[] getAllLevelSize()
+     {
+         int[] counts = new int[levelCount()];
+         for (int i = 0; i < levelCount(); i++)
+             counts[i] = get(i).size();
+         return counts;
+     }
+ 
+     Set<SSTableReader> allSSTables()
+     {
+         ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder();
+         builder.addAll(l0);
+         for (Set<SSTableReader> sstables : levels)
+             builder.addAll(sstables);
+         return builder.build();
+     }
+ 
+     /**
+      * given a level with sstables with first tokens [0, 10, 20, 30] and a 
lastCompactedSSTable with last = 15, we will
+      * return an Iterator over [20, 30, 0, 10].
+      */
+     Iterator<SSTableReader> wrappingIterator(int lvl, SSTableReader 
lastCompactedSSTable)
+     {
+         assert lvl > 0; // only makes sense in L1+
+         TreeSet<SSTableReader> level = levels[lvl - 1];
+         if (level.isEmpty())
+             return Collections.emptyIterator();
+         if (lastCompactedSSTable == null)
+             return level.iterator();
+ 
+         PeekingIterator<SSTableReader> tail = 
Iterators.peekingIterator(level.tailSet(lastCompactedSSTable).iterator());
+         SSTableReader pivot = null;
+         // then we need to make sure that the first token of the pivot is 
greater than the last token of the lastCompactedSSTable
+         while (tail.hasNext())
+         {
+             SSTableReader potentialPivot = tail.peek();
+             if (potentialPivot.first.compareTo(lastCompactedSSTable.last) > 0)
+             {
+                 pivot = potentialPivot;
+                 break;
+             }
+             tail.next();
+         }
+ 
+         if (pivot == null)
+             return level.iterator();
+ 
+         return Iterators.concat(tail, level.headSet(pivot, false).iterator());
+     }
+ 
+     void logDistribution()
+     {
+         if (logger.isTraceEnabled())
+         {
+             for (int i = 0; i < levelCount(); i++)
+             {
+                 Set<SSTableReader> level = get(i);
+                 if (!level.isEmpty())
+                 {
+                     logger.trace("L{} contains {} SSTables ({}) in {}",
+                                  i,
+                                  level.size(),
+                                  
FBUtilities.prettyPrintMemory(SSTableReader.getTotalBytes(level)),
+                                  this);
+                 }
+             }
+         }
+     }
+ 
+     Set<SSTableReader>[] snapshot()
+     {
+         Set<SSTableReader> [] levelsCopy = new Set[levelCount()];
+         for (int i = 0; i < levelCount(); i++)
+             levelsCopy[i] = ImmutableSet.copyOf(get(i));
+         return levelsCopy;
+     }
+ 
+     /**
+      * do extra verification of the sstables in the generations
+      *
+      * only used during tests
+      */
+     private void maybeVerifyLevels()
+     {
+         if (!strictLCSChecksTest || System.nanoTime() - lastOverlapCheck <= 
TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS))
+             return;
+         logger.info("LCS verifying levels");
+         lastOverlapCheck = System.nanoTime();
+         for (int i = 1; i < levelCount(); i++)
+         {
+             SSTableReader prev = null;
+             for (SSTableReader sstable : get(i))
+             {
+                 // no overlap:
+                 assert prev == null || prev.last.compareTo(sstable.first) < 0;
+                 prev = sstable;
+                 // make sure it does not exist in any other level:
+                 for (int j = 0; j < levelCount(); j++)
+                 {
+                     if (i == j)
+                         continue;
+                     assert !get(j).contains(sstable);
+                 }
+             }
+         }
+     }
+ }
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 37c9435,d630730..4f70f64
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -217,76 -151,9 +150,8 @@@ public class LeveledManifes
  
          if (logger.isTraceEnabled())
              logger.trace("Adding [{}]", toString(added));
- 
-         for (SSTableReader ssTableReader : added)
-             add(ssTableReader);
-         lastCompactedKeys[minLevel] = 
SSTableReader.sstableOrdering.max(added).last;
-     }
- 
-     public synchronized void repairOverlappingSSTables(int level)
-     {
-         SSTableReader previous = null;
-         Collections.sort(generations[level], SSTableReader.sstableComparator);
-         List<SSTableReader> outOfOrderSSTables = new ArrayList<>();
-         for (SSTableReader current : generations[level])
-         {
-             if (previous != null && current.first.compareTo(previous.last) <= 
0)
-             {
-                 logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}].  
This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact 
that you have dropped sstables from another node into the data directory. " +
-                             "Sending back to L0.  If you didn't drop in 
sstables, and have not yet run scrub, you should do so since you may also have 
rows out-of-order within an sstable",
-                             level, previous, previous.first, previous.last, 
current, current.first, current.last);
-                 outOfOrderSSTables.add(current);
-             }
-             else
-             {
-                 previous = current;
-             }
-         }
- 
-         if (!outOfOrderSSTables.isEmpty())
-         {
-             for (SSTableReader sstable : outOfOrderSSTables)
-                 sendBackToL0(sstable);
-         }
-     }
- 
-     /**
-      * Checks if adding the sstable creates an overlap in the level
-      * @param sstable the sstable to add
-      * @return true if it is safe to add the sstable in the level.
-      */
-     private boolean canAddSSTable(SSTableReader sstable)
-     {
-         int level = sstable.getSSTableLevel();
-         if (level == 0)
-             return true;
- 
-         List<SSTableReader> copyLevel = new ArrayList<>(generations[level]);
-         copyLevel.add(sstable);
-         Collections.sort(copyLevel, SSTableReader.sstableComparator);
- 
-         SSTableReader previous = null;
-         for (SSTableReader current : copyLevel)
-         {
-             if (previous != null && current.first.compareTo(previous.last) <= 
0)
-                 return false;
-             previous = current;
-         }
-         return true;
-     }
--
-     private synchronized void sendBackToL0(SSTableReader sstable)
-     {
-         remove(sstable);
-         try
-         {
-             sstable.mutateLevelAndReload(0);
-             add(sstable);
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException("Could not reload sstable meta data", 
e);
-         }
+         generations.addAll(added);
+         lastCompactedSSTables[minLevel] = 
SSTableReader.sstableOrdering.max(added);
      }
  
      private String toString(Collection<SSTableReader> sstables)
@@@ -364,14 -231,9 +229,14 @@@
          // This isn't a magic wand -- if you are consistently writing too 
fast for LCS to keep
          // up, you're still screwed.  But if instead you have intermittent 
bursts of activity,
          // it can help a lot.
 +
 +        // Let's check that L0 is far enough behind to warrant STCS.
 +        // If it is, it will be used before proceeding any of higher level
 +        CompactionCandidate l0Compaction = getSTCSInL0CompactionCandidate();
 +
-         for (int i = generations.length - 1; i > 0; i--)
+         for (int i = generations.levelCount() - 1; i > 0; i--)
          {
-             List<SSTableReader> sstables = getLevel(i);
+             Set<SSTableReader> sstables = generations.get(i);
              if (sstables.isEmpty())
                  continue; // mostly this just avoids polluting the debug log 
with zero scores
              // we want to calculate score excluding compacting ones
@@@ -394,7 -257,7 +259,7 @@@
                      candidates = getOverlappingStarvedSSTables(nextLevel, 
candidates);
                      if (logger.isTraceEnabled())
                          logger.trace("Compaction candidates for L{} are {}", 
i, toString(candidates));
--                    return new CompactionCandidate(candidates, nextLevel, 
cfs.getCompactionStrategyManager().getMaxSSTableBytes());
++                    return new CompactionCandidate(candidates, nextLevel, 
maxSSTableSizeInBytes);
                  }
                  else
                  {
@@@ -764,9 -579,9 +582,7 @@@
      @VisibleForTesting
      List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> 
candidates)
      {
-         List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
-         Collections.sort(ageSortedCandidates, 
SSTableReader.maxTimestampAscending);
-         return ageSortedCandidates;
 -        List<SSTableReader> copy = new ArrayList<>(candidates);
 -        copy.sort(SSTableReader.maxTimestampAscending);
 -        return ImmutableList.copyOf(copy);
++        return 
ImmutableList.sortedCopyOf(SSTableReader.maxTimestampAscending, candidates);
      }
  
      public synchronized Set<SSTableReader>[] getSStablesPerLevelSnapshot()
@@@ -818,13 -618,6 +619,13 @@@
              tasks += estimated[i];
          }
  
-         if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > 
cfs.getMaximumCompactionThreshold())
++        if (!DatabaseDescriptor.getDisableSTCSInL0() && 
generations.get(0).size() > cfs.getMaximumCompactionThreshold())
 +        {
-             int l0compactions = getLevel(0).size() / 
cfs.getMaximumCompactionThreshold();
++            int l0compactions = generations.get(0).size() / 
cfs.getMaximumCompactionThreshold();
 +            tasks += l0compactions;
 +            estimated[0] += l0compactions;
 +        }
 +
          logger.trace("Estimating {} compactions to do for {}.{}",
                       Arrays.toString(estimated), cfs.keyspace.getName(), 
cfs.name);
          return Ints.checkedCast(tasks);
@@@ -851,25 -644,18 +652,26 @@@
              assert newLevel > 0;
          }
          return newLevel;
+     }
  
+     synchronized Set<SSTableReader> getLevel(int level)
+     {
+         return ImmutableSet.copyOf(generations.get(level));
      }
  
-     public Iterable<SSTableReader> getAllSSTables()
+     synchronized List<SSTableReader> getLevelSorted(int level, 
Comparator<SSTableReader> comparator)
      {
-         Set<SSTableReader> sstables = new HashSet<>();
-         for (List<SSTableReader> generation : generations)
-         {
-             sstables.addAll(generation);
-         }
-         return sstables;
 -        List<SSTableReader> copy = new ArrayList<>(generations.get(level));
 -        copy.sort(comparator);
 -        return ImmutableList.copyOf(copy);
++        return ImmutableList.sortedCopyOf(comparator, generations.get(level));
 +    }
 +
-     public synchronized void newLevel(SSTableReader sstable, int oldLevel)
++    synchronized void newLevel(SSTableReader sstable, int oldLevel)
 +    {
-         boolean removed = generations[oldLevel].remove(sstable);
-         assert removed : "Could not remove " + sstable +" from " + oldLevel;
-         add(sstable);
-         lastCompactedKeys[oldLevel] = sstable.last;
++        boolean removed = generations.get(oldLevel).remove(sstable);
++        // if reload races with the metadataChanged notification the sstable 
might already be removed
++        if (!removed)
++            logger.warn("Could not remove "+sstable+" from "+oldLevel);
++        generations.addAll(Collections.singleton(sstable));
++        lastCompactedSSTables[oldLevel] = sstable;
      }
  
      public static class CompactionCandidate
diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index d9d8db1,2a4f293..e2fedad
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@@ -245,30 -245,19 +245,16 @@@ public class StandaloneScrubbe
          if 
(strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class))
          {
              int maxSizeInMB = 
(int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 
1024L));
- 
-             System.out.println("Checking leveled manifest");
-             Predicate<SSTableReader> repairedPredicate = new 
Predicate<SSTableReader>()
+             int fanOut = 
cfs.getCompactionStrategyManager().getLevelFanoutSize();
 -            List<SSTableReader> repaired = new ArrayList<>();
 -            List<SSTableReader> unrepaired = new ArrayList<>();
 -
 -            for (SSTableReader sstable : sstables)
++            for (AbstractStrategyHolder.GroupedSSTableContainer sstableGroup 
: strategyManager.groupSSTables(sstables))
              {
-                 @Override
-                 public boolean apply(SSTableReader sstable)
 -                if (sstable.isRepaired())
 -                    repaired.add(sstable);
 -                else
 -                    unrepaired.add(sstable);
++                for (int i = 0; i < sstableGroup.numGroups(); i++)
 +                {
-                     return sstable.isRepaired();
++                    List<SSTableReader> groupSSTables = new 
ArrayList<>(sstableGroup.getGroup(i));
++                    // creating the manifest makes sure the leveling is sane:
++                    LeveledManifest.create(cfs, maxSizeInMB, fanOut, 
groupSSTables);
 +                }
-             };
- 
-             List<SSTableReader> repaired = 
Lists.newArrayList(Iterables.filter(sstables, repairedPredicate));
-             List<SSTableReader> unRepaired = 
Lists.newArrayList(Iterables.filter(sstables, 
Predicates.not(repairedPredicate)));
- 
-             LeveledManifest repairedManifest = LeveledManifest.create(cfs, 
maxSizeInMB, cfs.getLevelFanoutSize(), repaired);
-             for (int i = 1; i < repairedManifest.getLevelCount(); i++)
-             {
-                 repairedManifest.repairOverlappingSSTables(i);
-             }
-             LeveledManifest unRepairedManifest = LeveledManifest.create(cfs, 
maxSizeInMB, cfs.getLevelFanoutSize(), unRepaired);
-             for (int i = 1; i < unRepairedManifest.getLevelCount(); i++)
-             {
-                 unRepairedManifest.repairOverlappingSSTables(i);
              }
 -            LeveledManifest.create(cfs, maxSizeInMB, fanOut, repaired);
 -            LeveledManifest.create(cfs, maxSizeInMB, fanOut, unrepaired);
          }
      }
  
diff --cc 
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 8a8ed13,1a3ac44..a7a5396
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -29,8 -29,9 +30,9 @@@ import java.util.List
  import java.util.Map;
  import java.util.Random;
  import java.util.UUID;
+ import java.util.stream.Collectors;
  
 -import junit.framework.Assert;
 +import org.junit.Assert;
  import org.junit.After;
  import org.junit.Before;
  import org.junit.BeforeClass;
@@@ -365,26 -353,24 +364,24 @@@ public class LeveledCompactionStrategyT
          // we only have unrepaired sstables:
          assertEquals(sstableCount, cfs.getLiveSSTables().size());
  
-         SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
-         SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
+         SSTableReader sstable1 = 
unrepaired.manifest.getLevel(2).iterator().next();
+         SSTableReader sstable2 = 
unrepaired.manifest.getLevel(1).iterator().next();
  
 -        
sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor,
 System.currentTimeMillis());
 +        
sstable1.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable1.descriptor,
 System.currentTimeMillis(), null, false);
          sstable1.reloadSSTableMetadata();
          assertTrue(sstable1.isRepaired());
  
          manager.handleNotification(new 
SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
  
-         int repairedSSTableCount = 0;
-         for (List<SSTableReader> level : repaired.manifest.generations)
-             repairedSSTableCount += level.size();
+         int repairedSSTableCount = repaired.manifest.getSSTables().size();
          assertEquals(1, repairedSSTableCount);
          // make sure the repaired sstable ends up in the same level in the 
repaired manifest:
-         assertTrue(repaired.manifest.generations[2].contains(sstable1));
+         assertTrue(repaired.manifest.getLevel(2).contains(sstable1));
          // and that it is gone from unrepaired
-         assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
+         assertFalse(unrepaired.manifest.getLevel(2).contains(sstable1));
  
          unrepaired.removeSSTable(sstable2);
 -        manager.handleNotification(new 
SSTableAddedNotification(singleton(sstable2)), this);
 +        manager.handleNotification(new 
SSTableAddedNotification(singleton(sstable2), null), this);
          assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
          assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
      }
@@@ -548,68 -482,216 +545,282 @@@
      }
  
      @Test
 +    public void testDisableSTCSInL0() throws IOException
 +    {
 +        /*
 +        First creates a bunch of sstables in L1, then overloads L0 with 50 
sstables. Now with STCS in L0 enabled
 +        we should get a compaction task where the target level is 0, then we 
disable STCS-in-L0 and make sure that
 +        the compaction task we get targets L1 or higher.
 +         */
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        Map<String, String> localOptions = new HashMap<>();
 +        localOptions.put("class", "LeveledCompactionStrategy");
 +        localOptions.put("sstable_size_in_mb", "1");
 +        cfs.setCompactionParameters(localOptions);
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        for (int i = 0; i < 11; i++)
 +        {
 +            SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, 
cfs);
 +            
l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 
1);
 +            l1sstable.reloadSSTableMetadata();
 +            sstables.add(l1sstable);
 +        }
 +
 +        for (int i = 100; i < 150; i++)
 +            sstables.add(MockSchema.sstable(i, 1 * 1024 * 1024, cfs));
 +
 +        cfs.disableAutoCompaction();
 +        cfs.addSSTables(sstables);
 +        assertEquals(0, getTaskLevel(cfs));
 +
 +        try
 +        {
 +            CompactionManager.instance.setDisableSTCSInL0(true);
 +            assertTrue(getTaskLevel(cfs) > 0);
 +        }
 +        finally
 +        {
 +            CompactionManager.instance.setDisableSTCSInL0(false);
 +        }
 +    }
 +
 +    private int getTaskLevel(ColumnFamilyStore cfs)
 +    {
 +        int level = -1;
 +        for (List<AbstractCompactionStrategy> strategies : 
cfs.getCompactionStrategyManager().getStrategies())
 +        {
 +            for (AbstractCompactionStrategy strategy : strategies)
 +            {
 +                AbstractCompactionTask task = 
strategy.getNextBackgroundTask(0);
 +                if (task != null)
 +                {
 +                    try
 +                    {
 +                        assertTrue(task instanceof LeveledCompactionTask);
 +                        LeveledCompactionTask lcsTask = 
(LeveledCompactionTask) task;
 +                        level = Math.max(level, lcsTask.getLevel());
 +                    }
 +                    finally
 +                    {
 +                        task.transaction.abort();
 +                    }
 +                }
 +            }
 +        }
 +        return level;
 +    }
++
++    @Test
+     public void testAddingOverlapping()
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
+         List<SSTableReader> currentLevel = new ArrayList<>();
+         int gen = 1;
+         currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs));
+         currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs));
+         currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs));
+         currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs));
+         currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs));
+ 
+         lm.addSSTables(currentLevel);
+         assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+         assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5));
+ 
+         List<SSTableReader> newSSTables = new ArrayList<>();
+         // this sstable last token is the same as the first token of L1 
above, should get sent to L0:
+         newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs));
+         lm.addSSTables(newSSTables);
+         assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+         assertEquals(0, newSSTables.get(0).getSSTableLevel());
+         assertTrue(lm.getLevel(0).containsAll(newSSTables));
+ 
+         newSSTables.clear();
+         newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs));
+         lm.addSSTables(newSSTables);
+         assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+         assertEquals(0, newSSTables.get(0).getSSTableLevel());
+         assertTrue(lm.getLevel(0).containsAll(newSSTables));
+ 
+         newSSTables.clear();
+         newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
+         lm.addSSTables(newSSTables);
+         assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
+         assertEquals(0, newSSTables.get(0).getSSTableLevel());
+         assertTrue(lm.getLevel(0).containsAll(newSSTables));
+ 
+         newSSTables.clear();
+         newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
+         newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs));
+         lm.addSSTables(newSSTables);
+         List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 
3));
+         newL1.add(newSSTables.get(1));
+         assertLevelsEqual(lm.getLevel(1), newL1);
+         newSSTables.remove(1);
+         assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 
0));
+         assertTrue(lm.getLevel(0).containsAll(newSSTables));
+     }
+ 
+     @Test
+     public void singleTokenSSTableTest()
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
+         List<SSTableReader> expectedL1 = new ArrayList<>();
+ 
+         int gen = 1;
+         // single sstable, single token (100)
+         expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
+         lm.addSSTables(expectedL1);
+ 
+         List<SSTableReader> expectedL0 = new ArrayList<>();
+ 
+         // should get moved to L0:
+         expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs));
+         expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs));
+         expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs));
+         expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
+         lm.addSSTables(expectedL0);
+ 
+         assertLevelsEqual(expectedL0, lm.getLevel(0));
+         assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 
0));
+         assertLevelsEqual(expectedL1, lm.getLevel(1));
+         assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 
1));
+ 
+         // should work:
+         expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs));
+         expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs));
+         lm.addSSTables(expectedL1.subList(1, expectedL1.size()));
+         assertLevelsEqual(expectedL1, lm.getLevel(1));
+     }
+ 
+     @Test
+     public void randomMultiLevelAddTest()
+     {
+         int iterations = 100;
+         int levelCount = 8;
+ 
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new 
SizeTieredCompactionStrategyOptions());
+         long seed = System.currentTimeMillis();
+         Random r = new Random(seed);
+         List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, 
levelCount, 0, r);
+ 
+         int sstableCount = newLevels.size();
+         lm.addSSTables(newLevels);
+ 
+         int [] expectedLevelSizes = lm.getAllLevelSize();
+ 
+         for (int j = 0; j < iterations; j++)
+         {
+             newLevels = generateNewRandomLevels(cfs, 20, levelCount, 
sstableCount, r);
+             sstableCount += newLevels.size();
+ 
+             int[] canAdd = canAdd(lm, newLevels, levelCount);
+             for (int i = 0; i < levelCount; i++)
+                 expectedLevelSizes[i] += canAdd[i];
+             lm.addSSTables(newLevels);
+         }
+ 
+         // and verify no levels overlap
+         int actualSSTableCount = 0;
+         for (int i = 0; i < levelCount; i++)
+         {
+             actualSSTableCount += lm.getLevelSize(i);
+             List<SSTableReader> level = new ArrayList<>(lm.getLevel(i));
+             int lvl = i;
+             assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == 
lvl));
+             if (i > 0)
+             {
+                 level.sort(SSTableReader.sstableComparator);
+                 SSTableReader prev = null;
+                 for (SSTableReader sstable : level)
+                 {
+                     if (prev != null && sstable.first.compareTo(prev.last) <= 
0)
+                     {
+                         String levelStr = level.stream().map(s -> 
String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", "));
+                         String overlap = String.format("sstable [%s, %s] 
overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, 
prev.first, prev.last, i, levelStr);
+                         Assert.fail("[seed = "+seed+"] overlap in level 
"+lvl+": " + overlap);
+                     }
+                     prev = sstable;
+                 }
+             }
+         }
+         assertEquals(sstableCount, actualSSTableCount);
+         for (int i = 0; i < levelCount; i++)
+             assertEquals("[seed = " + seed + "] wrong sstable count in level 
= " + i, expectedLevelSizes[i], lm.getLevel(i).size());
+     }
+ 
+     private static List<SSTableReader> 
generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int 
levelCount, int startGen, Random r)
+     {
+         List<SSTableReader> newLevels = new ArrayList<>();
+         for (int level = 0; level < levelCount; level++)
+         {
+             int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1;
+             List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2);
+ 
+             for (int i = 0; i < numLevelSSTables * 2; i++)
+                 tokens.add(r.nextInt(4000));
+             Collections.sort(tokens);
+             for (int i = 0; i < tokens.size() - 1; i += 2)
+             {
+                 SSTableReader sstable = 
MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), 
level, cfs);
+                 newLevels.add(sstable);
+             }
+         }
+         return newLevels;
+     }
+ 
+     /**
+      * brute-force checks if the new sstables can be added to the correct 
level in manifest
+      *
+      * @return count of expected sstables to add to each level
+      */
+     private static int[] canAdd(LeveledManifest lm, List<SSTableReader> 
newSSTables, int levelCount)
+     {
+         Map<Integer, Collection<SSTableReader>> sstableGroups = new 
HashMap<>();
+         newSSTables.forEach(s -> 
sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new 
ArrayList<>()).add(s));
+ 
+         int[] canAdd = new int[levelCount];
+         for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : 
sstableGroups.entrySet())
+         {
+             int level = lvlGroup.getKey();
+             if (level == 0)
+             {
+                 canAdd[0] += lvlGroup.getValue().size();
+                 continue;
+             }
+ 
+             List<SSTableReader> newLevel = new 
ArrayList<>(lm.getLevel(level));
+             for (SSTableReader sstable : lvlGroup.getValue())
+             {
+                 newLevel.add(sstable);
+                 newLevel.sort(SSTableReader.sstableComparator);
+ 
+                 SSTableReader prev = null;
+                 boolean kept = true;
+                 for (SSTableReader sst : newLevel)
+                 {
+                     if (prev != null && prev.last.compareTo(sst.first) >= 0)
+                     {
+                         newLevel.remove(sstable);
+                         kept = false;
+                         break;
+                     }
+                     prev = sst;
+                 }
+                 if (kept)
+                     canAdd[level] += 1;
+                 else
+                     canAdd[0] += 1;
+             }
+         }
+         return canAdd;
+     }
+ 
+     private static void assertLevelsEqual(Collection<SSTableReader> l1, 
Collection<SSTableReader> l2)
+     {
+         assertEquals(l1.size(), l2.size());
+         assertEquals(new HashSet<>(l1), new HashSet<>(l2));
+     }
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java
index 0000000,1f17bf8..0e20d63
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java
@@@ -1,0 -1,199 +1,199 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.List;
+ 
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
 -import org.apache.cassandra.MockSchema;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.BufferDecoratedKey;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.schema.MockSchema;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ 
+ import static junit.framework.Assert.assertFalse;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ public class LeveledGenerationsTest extends CQLTester
+ {
+     @BeforeClass
+     public static void setUp()
+     {
+         DatabaseDescriptor.daemonInitialization();
+         MockSchema.cleanup();
+     }
+ 
+     @Test
+     public void testWrappingIterable()
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+ 
+         LeveledGenerations gens = new LeveledGenerations();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             SSTableReader sstable = MockSchema.sstable(i, 5, true, i, i, 2, 
cfs);
+             gens.addAll(Collections.singleton(sstable));
+         }
+         int gen = 10;
+         assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 5)),
+                    6, 5, 10);
+         assertIter(gens.wrappingIterator(2, null),
+                    0, 9, 10);
+         assertIter(gens.wrappingIterator(2, sst(++gen, cfs, -10, 0)),
+                    1, 0, 10);
+         assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 9)),
+                    0, 9, 10);
+         assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 0, 1000)),
+                    0, 9, 10);
+ 
+         gens.addAll(Collections.singleton(MockSchema.sstable(100, 5, true, 5, 
10, 3, cfs)));
+         assertIter(gens.wrappingIterator(3, sst(++gen, cfs, -10, 0)),
+                    5, 5, 1);
+         assertIter(gens.wrappingIterator(3, sst(++gen, cfs, 0, 100)),
+                    5, 5, 1);
+ 
+         gens.addAll(Collections.singleton(MockSchema.sstable(200, 5, true, 5, 
10, 4, cfs)));
+         gens.addAll(Collections.singleton(MockSchema.sstable(201, 5, true, 
40, 50, 4, cfs)));
+         assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 0)),
+                    5, 40, 2);
+         assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 5)),
+                    40, 5, 2);
+         assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 7, 8)),
+                    40, 5, 2);
+         assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 39, 39)),
+                    40, 5, 2);
+         assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 40, 40)),
+                    5, 40, 2);
+         assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 100, 1000)),
+                    5, 40, 2);
+     }
+ 
+     @Test
+     public void testWrappingIterableWiderSSTables()
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         LeveledGenerations generations = new LeveledGenerations();
+         int gen = 0;
+         generations.addAll(Lists.newArrayList(
+             sst(++gen, cfs, 0, 50),
+             sst(++gen, cfs, 51, 100),
+             sst(++gen, cfs, 150, 200)));
+ 
+         assertIter(generations.wrappingIterator(2, sst(++gen, cfs, -100, 
-50)),
+                    0, 150, 3);
+ 
+         assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 40)),
+                    51, 0, 3);
+         assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 50)),
+                    51, 0, 3);
+         assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 51)),
+                    150, 51, 3);
+ 
+         assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 149)),
+                    150, 51, 3);
+         assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 300)),
+                    0, 150, 3);
+ 
+     }
+ 
+     @Test
+     public void testEmptyLevel()
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         LeveledGenerations generations = new LeveledGenerations();
+         assertFalse(generations.wrappingIterator(3, sst(0, cfs, 0, 
10)).hasNext());
+         assertFalse(generations.wrappingIterator(3, null).hasNext());
+     }
+ 
+     @Test
+     public void testFillLevels()
+     {
+         LeveledGenerations generations = new LeveledGenerations();
+ 
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         for (int i = 0; i < LeveledGenerations.MAX_LEVEL_COUNT; i++)
+             
generations.addAll(Collections.singleton(MockSchema.sstableWithLevel(i, i, i, 
i, cfs)));
+ 
+         for (int i = 0; i < generations.levelCount(); i++)
+             assertEquals(i, 
generations.get(i).iterator().next().getSSTableLevel());
+ 
+         assertEquals(9, generations.levelCount());
+ 
+         try
+         {
+             generations.get(9);
+             fail("don't have 9 generations");
+         }
+         catch (ArrayIndexOutOfBoundsException e)
+         {}
+         try
+         {
+             generations.get(-1);
+             fail("don't have -1 generations");
+         }
+         catch (ArrayIndexOutOfBoundsException e)
+         {}
+     }
+ 
+     private void assertIter(Iterator<SSTableReader> iter, long first, long 
last, int expectedCount)
+     {
+         List<SSTableReader> drained = Lists.newArrayList(iter);
+         assertEquals(expectedCount, drained.size());
+         assertEquals(dk(first).getToken(), first(drained).first.getToken());
+         assertEquals(dk(last).getToken(), last(drained).first.getToken()); // 
we sort by first token, so this is the first token of the last sstable in iter
+     }
+ 
+     private SSTableReader last(Iterable<SSTableReader> iter)
+     {
+         return Iterables.getLast(iter);
+     }
+     private SSTableReader first(Iterable<SSTableReader> iter)
+     {
+         SSTableReader first = Iterables.getFirst(iter, null);
+         if (first == null)
+             throw new RuntimeException();
+         return first;
+     }
+ 
+     private DecoratedKey dk(long x)
+     {
+         return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(x), 
ByteBufferUtil.bytes(x));
+     }
+     private SSTableReader sst(int gen, ColumnFamilyStore cfs, long first, 
long last)
+     {
+         return MockSchema.sstable(gen, 5, true, first, last, 2, cfs);
+     }
+ 
+     private void print(SSTableReader sstable)
+     {
+         System.out.println(String.format("%d %s %s %d", 
sstable.descriptor.generation, sstable.first, sstable.last, 
sstable.getSSTableLevel()));
+     }
+ }
diff --cc test/unit/org/apache/cassandra/schema/MockSchema.java
index 5ce8520,0000000..7d2d874
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@@ -1,222 -1,0 +1,233 @@@
 +/*
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*    http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing,
 +* software distributed under the License is distributed on an
 +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +* KIND, either express or implied.  See the License for the
 +* specific language governing permissions and limitations
 +* under the License.
 +*/
 +package org.apache.cassandra.schema;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Function;
 +
 +import com.google.common.collect.ImmutableSet;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.IndexSummary;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.ChannelProxy;
 +import org.apache.cassandra.io.util.FileHandle;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.Memory;
 +import org.apache.cassandra.utils.AlwaysPresentFilter;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 +
 +public class MockSchema
 +{
 +    static
 +    {
 +        Memory offsets = Memory.allocate(4);
 +        offsets.setInt(0, 0);
 +        indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 
0, Memory.allocate(4), 0, 0, 0, 1);
 +    }
 +    private static final AtomicInteger id = new AtomicInteger();
 +    public static final Keyspace ks = 
Keyspace.mockKS(KeyspaceMetadata.create("mockks", 
KeyspaceParams.simpleTransient(1)));
 +
 +    public static final IndexSummary indexSummary;
 +
 +    private static final File tempFile = temp("mocksegmentedfile");
 +
 +    public static Memtable memtable(ColumnFamilyStore cfs)
 +    {
 +        return new Memtable(cfs.metadata());
 +    }
 +
 +    public static SSTableReader sstable(int generation, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, false, cfs);
 +    }
 +
 +    public static SSTableReader sstable(int generation, long first, long 
last, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, 0, false, first, last, cfs);
 +    }
 +
 +    public static SSTableReader sstable(int generation, boolean keepRef, 
ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, 0, keepRef, cfs);
 +    }
 +
 +    public static SSTableReader sstable(int generation, int size, 
ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, false, cfs);
 +    }
 +    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, keepRef, generation, generation, 
cfs);
 +    }
 +
++    public static SSTableReader sstableWithLevel(int generation, long 
firstToken, long lastToken, int level, ColumnFamilyStore cfs)
++    {
++        return sstable(generation, 0, false, firstToken, lastToken, level, 
cfs);
++    }
++
 +    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
 +    {
++        return sstable(generation, size, keepRef, firstToken, lastToken, 0, 
cfs);
++    }
++
++    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
++    {
 +        Descriptor descriptor = new 
Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
 +                                               cfs.keyspace.getName(),
 +                                               cfs.getTableName(),
 +                                               generation, 
SSTableFormat.Type.BIG);
 +        Set<Component> components = ImmutableSet.of(Component.DATA, 
Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
 +        for (Component component : components)
 +        {
 +            File file = new File(descriptor.filenameFor(component));
 +            try
 +            {
 +                file.createNewFile();
 +            }
 +            catch (IOException e)
 +            {
 +            }
 +        }
 +        // .complete() with size to make sstable.onDiskLength work
 +        try (FileHandle.Builder builder = new FileHandle.Builder(new 
ChannelProxy(tempFile)).bufferSize(size);
 +             FileHandle fileHandle = builder.complete(size))
 +        {
 +            if (size > 0)
 +            {
 +                try
 +                {
 +                    File file = new 
File(descriptor.filenameFor(Component.DATA));
 +                    try (RandomAccessFile raf = new RandomAccessFile(file, 
"rw"))
 +                    {
 +                        raf.setLength(size);
 +                    }
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +            }
 +            SerializationHeader header = 
SerializationHeader.make(cfs.metadata(), Collections.emptyList());
 +            StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata().comparator)
 +                                                     
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, UNREPAIRED_SSTABLE, null, false, header)
 +                                                     .get(MetadataType.STATS);
 +            SSTableReader reader = SSTableReader.internalOpen(descriptor, 
components, cfs.metadata,
 +                                                              
fileHandle.sharedCopy(), fileHandle.sharedCopy(), indexSummary.sharedCopy(),
 +                                                              new 
AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
 +            reader.first = readerBounds(firstToken);
 +            reader.last = readerBounds(lastToken);
 +            if (!keepRef)
 +                reader.selfRef().release();
 +            return reader;
 +        }
++
 +    }
 +
 +    public static ColumnFamilyStore newCFS()
 +    {
 +        return newCFS(ks.getName());
 +    }
 +
 +    public static ColumnFamilyStore newCFS(String ksname)
 +    {
 +        return newCFS(newTableMetadata(ksname));
 +    }
 +
 +    public static ColumnFamilyStore newCFS(Function<TableMetadata.Builder, 
TableMetadata.Builder> options)
 +    {
 +        return newCFS(ks.getName(), options);
 +    }
 +
 +    public static ColumnFamilyStore newCFS(String ksname, 
Function<TableMetadata.Builder, TableMetadata.Builder> options)
 +    {
 +        return newCFS(options.apply(newTableMetadataBuilder(ksname)).build());
 +    }
 +
 +    public static ColumnFamilyStore newCFS(TableMetadata metadata)
 +    {
 +        return new ColumnFamilyStore(ks, metadata.name, 0, new 
TableMetadataRef(metadata), new Directories(metadata), false, false, false);
 +    }
 +
 +    public static TableMetadata newTableMetadata(String ksname)
 +    {
 +        return newTableMetadata(ksname, "mockcf" + (id.incrementAndGet()));
 +    }
 +
 +    public static TableMetadata newTableMetadata(String ksname, String cfname)
 +    {
 +        return newTableMetadataBuilder(ksname, cfname).build();
 +    }
 +
 +    public static TableMetadata.Builder newTableMetadataBuilder(String ksname)
 +    {
 +        return newTableMetadataBuilder(ksname, "mockcf" + 
(id.incrementAndGet()));
 +    }
 +
 +    public static TableMetadata.Builder newTableMetadataBuilder(String 
ksname, String cfname)
 +    {
 +        return TableMetadata.builder(ksname, cfname)
 +                            .partitioner(Murmur3Partitioner.instance)
 +                            .addPartitionKeyColumn("key", UTF8Type.instance)
 +                            .addClusteringColumn("col", UTF8Type.instance)
 +                            .addRegularColumn("value", UTF8Type.instance)
 +                            .caching(CachingParams.CACHE_NOTHING);
 +    }
 +
 +    public static BufferDecoratedKey readerBounds(long generation)
 +    {
 +        return new BufferDecoratedKey(new 
Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER);
 +    }
 +
 +    private static File temp(String id)
 +    {
 +        File file = FileUtils.createTempFile(id, "tmp");
 +        file.deleteOnExit();
 +        return file;
 +    }
 +
 +    public static void cleanup()
 +    {
 +        // clean up data directory which are stored as data 
directory/keyspace/data files
 +        for (String dirName : DatabaseDescriptor.getAllDataFileLocations())
 +        {
 +            File dir = new File(dirName);
 +            if (!dir.exists())
 +                continue;
 +            String[] children = dir.list();
 +            for (String child : children)
 +                FileUtils.deleteRecursive(new File(dir, child));
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to