This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit fa7659253f786284d33f840199dc26602d03363a Merge: 91bcbb2 f41ea9f Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Sep 16 11:05:53 2020 +0200 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 2 + NEWS.txt | 2 + .../db/compaction/AbstractCompactionStrategy.java | 21 +- .../db/compaction/AbstractStrategyHolder.java | 6 +- .../db/compaction/CompactionStrategyManager.java | 206 +++++------- .../db/compaction/LeveledCompactionStrategy.java | 29 +- .../db/compaction/LeveledGenerations.java | 310 ++++++++++++++++++ .../cassandra/db/compaction/LeveledManifest.java | 354 +++++---------------- .../apache/cassandra/tools/StandaloneScrubber.java | 26 +- .../LongLeveledCompactionStrategyCQLTest.java | 92 ++++++ .../LongLeveledCompactionStrategyTest.java | 2 +- .../compaction/LeveledCompactionStrategyTest.java | 235 +++++++++++++- .../db/compaction/LeveledGenerationsTest.java | 199 ++++++++++++ .../org/apache/cassandra/schema/MockSchema.java | 11 + 14 files changed, 1043 insertions(+), 452 deletions(-) diff --cc CHANGES.txt index 5d8a9fb,117fa96..3c4a810 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,40 -1,9 +1,42 @@@ -3.11.9 +4.0-beta3 + * Allow zero padding in timestamp serialization (CASSANDRA-16105) + * Add byte array backed cells (CASSANDRA-15393) + * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801) + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099) + * Add nodetool getfullquerylog (CASSANDRA-15988) + * Fix yaml format and alignment in tpstats (CASSANDRA-11402) + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084) + * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954) + * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909) + * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861) ++Merged from 3.11: + * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103) -3.11.8 +4.0-beta2 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939) + * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876) + * Remove deprecated HintedHandOffManager (CASSANDRA-15939) + * Prevent repair from overrunning compaction (CASSANDRA-15817) + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053) + * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802) + * Fix unicode chars error input (CASSANDRA-15990) + * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788) + * Handle errors in StreamSession#prepare (CASSANDRA-15852) + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039) + * Remove COMPACT STORAGE internals (CASSANDRA-13994) + * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976) + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425) + * Improve logging for socket connection/disconnection (CASSANDRA-15980) + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928) + * Forbid altering UDTs used in partition keys (CASSANDRA-15933) + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973) + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766) + * Verify sstable components on startup (CASSANDRA-15945) + * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937) +Merged from 3.11: * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071) * Fix short read protection for GROUP BY queries (CASSANDRA-15459) + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) Merged from 3.0: * Fix gossip shutdown order (CASSANDRA-15816) diff --cc NEWS.txt index 7f20a7d,b3246b5..4b5264f --- a/NEWS.txt +++ b/NEWS.txt @@@ -33,244 -42,21 +33,246 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. -3.11.9 -====== +4.0 +=== + +New features +------------ + - Nodes will now bootstrap all intra-cluster connections at startup by default and wait + 10 seconds for the all but one node in the local data center to be connected and marked + UP in gossip. This prevents nodes from coordinating requests and failing because they + aren't able to connect to the cluster fast enough. block_for_peers_timeout_in_secs in + cassandra.yaml can be used to configure how long to wait (or whether to wait at all) + and block_for_peers_in_remote_dcs can be used to also block on all but one node in + each remote DC as well. See CASSANDRA-14297 and CASSANDRA-13993 for more information. + - *Experimental* support for Transient Replication and Cheap Quorums introduced by CASSANDRA-14404 + The intended audience for this functionality is expert users of Cassandra who are prepared + to validate every aspect of the database for their application and deployment practices. Future + releases of Cassandra will make this feature suitable for a wider audience. + - *Experimental* support for Java 11 has been added. JVM options that differ between or are + specific for Java 8 and 11 have been moved from jvm.options into jvm8.options and jvm11.options. + IMPORTANT: Running C* on Java 11 is *experimental* and do it at your own risk. + - LCS now respects the max_threshold parameter when compacting - this was hard coded to 32 + before, but now it is possible to do bigger compactions when compacting from L0 to L1. + This also applies to STCS-compactions in L0 - if there are more than 32 sstables in L0 + we will compact at most max_threshold sstables in an L0 STCS compaction. See CASSANDRA-14388 + for more information. + - There is now an option to automatically upgrade sstables after Cassandra upgrade, enable + either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See + CASSANDRA-14197. + - `nodetool refresh` has been deprecated in favour of `nodetool import` - see CASSANDRA-6719 + for details + - An experimental option to compare all merkle trees together has been added - for example, in + a 3 node cluster with 2 replicas identical and 1 out-of-date, with this option enabled, the + out-of-date replica will only stream a single copy from up-to-date replica. Enable it by adding + "-os" to nodetool repair. See CASSANDRA-3200. + - The currentTimestamp, currentDate, currentTime and currentTimeUUID functions have been added. + See CASSANDRA-13132 + - Support for arithmetic operations between `timestamp`/`date` and `duration` has been added. + See CASSANDRA-11936 + - Support for arithmetic operations on number has been added. See CASSANDRA-11935 + - Preview expected streaming required for a repair (nodetool repair --preview), and validate the + consistency of repaired data between nodes (nodetool repair --validate). See CASSANDRA-13257 + - Support for selecting Map values and Set elements has been added for SELECT queries. See CASSANDRA-7396 + - Change-Data-Capture has been modified to make CommitLogSegments available + immediately upon creation via hard-linking the files. This means that incomplete + segments will be available in cdc_raw rather than fully flushed. See documentation + and CASSANDRA-12148 for more detail. + - The initial build of materialized views can be parallelized. The number of concurrent builder + threads is specified by the property `cassandra.yaml:concurrent_materialized_view_builders`. + This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders` + and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details. + - There is now a binary full query log based on Chronicle Queue that can be controlled using + nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. The log + contains all queries invoked, approximate time they were invoked, any parameters necessary + to bind wildcard values, and all query options. A human readable version of the log can be + dumped or tailed using the new bin/fqltool utility. The full query log is designed to be safe + to use in production and limits utilization of heap memory and disk space with limits + you can specify when enabling the log. + See nodetool and fqltool help text for more information. + - SSTableDump now supports the -l option to output each partition as it's own json object + See CASSANDRA-13848 for more detail + - Metric for coordinator writes per table has been added. See CASSANDRA-14232 + - Nodetool cfstats now has options to sort by various metrics as well as limit results. + - Operators can restrict login user activity to one or more datacenters. See `network_authorizer` + in cassandra.yaml, and the docs for create and alter role statements. CASSANDRA-13985 + - Roles altered from login=true to login=false will prevent existing connections from executing any + statements after the cache has been refreshed. CASSANDRA-13985 + - Support for audit logging of database activity. If enabled, logs every incoming + CQL command request, Authentication (successful as well as unsuccessful login) to a node. + - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, Cassandra will use stream + entire SSTables, significantly speeding up transfers. Any streaming related operations will see + corresponding improvement. See CASSANDRA-14556. + - NetworkTopologyStrategy now supports auto-expanding the replication_factor + option into all available datacenters at CREATE or ALTER time. For example, + specifying replication_factor: 3 translates to three replicas in every + datacenter. This auto-expansion will _only add_ datacenters for safety. + See CASSANDRA-14303 for more details. + - Added Python 3 support so cqlsh and cqlshlib is now compatible with Python 2.7 and Python 3.6. + Added --python option to cqlsh so users can specify the path to their chosen Python interpreter. + See CASSANDRA-10190 for details. + - Support for server side DESCRIBE statements has been added. See CASSANDRA-14825 + Upgrading --------- - - Custom compaction strategies must handle getting sstables added/removed notifications for - sstables already added/removed - see CASSANDRA-14103 for details. This has been a requirement - for correct operation since 3.11.0 due to an issue in CompactionStrategyManager. + - Sstables for tables using with a frozen UDT written by C* 3.0 appear as corrupted. -3.11.7 -====== + Background: The serialization-header in the -Statistics.db sstable component contains the type information + of the table columns. C* 3.0 write incorrect type information for frozen UDTs by omitting the + "frozen" information. Non-frozen UDTs were introduced by CASSANDRA-7423 in C* 3.6. Since then, the missing + "frozen" information leads to deserialization issues that result in CorruptSSTableExceptions, potentially other + exceptions as well. -Upgrading ---------- - - Nothing specific to this release, but please see previous upgrading sections, - especially if you are upgrading from 3.0. + As a mitigation, the sstable serialization-headers are rewritten to contain the missing "frozen" information for + UDTs once, when an upgrade from C* 3.0 is detected. This migration does not touch snapshots or backups. + + The sstablescrub tool now performs a check of the sstable serialization-header against the schema. A mismatch of + the types in the serialization-header and the schema will cause sstablescrub to error out and stop by default. + See the new `-e` option. `-e off` disables the new validation code. `-e fix` or `-e fix-only`, e.g. + `sstablescrub -e fix keyspace table`, will validate the serialization-header, rewrite the non-frozen UDTs + in the serialzation-header to frozen UDTs, if that matches the schema, and continue with scrub. + See `sstablescrub -h`. + (CASSANDRA-15035) + - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd tables from + 64kb to 16kb. For highly compressible data this can have a noticeable impact + on space utilization. You may want to consider manually specifying this value. + - Additional columns have been added to system_distributed.repair_history, + system_traces.sessions and system_traces.events. As a result select queries + against these tables - including queries against tracing tables performed + automatically by the drivers and cqlsh - will fail and generate an error in the log + during upgrade when the cluster is mixed version. On 3.x side this will also lead + to broken internode connections and lost messages. + Cassandra versions 3.0.20 and 3.11.6 pre-add these columns (see CASSANDRA-15385), + so please make sure to upgrade to those versions or higher before upgrading to + 4.0 for query tracing to not cause any issues during the upgrade to 4.0. + - Timestamp ties between values resolve differently: if either value has a TTL, + this value always wins. This is to provide consistent reconciliation before + and after the value expires into a tombstone. + - Cassandra 4.0 removed support for COMPACT STORAGE tables. All Compact Tables + have to be migrated using `ALTER ... DROP COMPACT STORAGE` statement in 3.0/3.11. + Cassandra starting 4.0 will not start if flags indicate that the table is non-CQL. + Syntax for creating compact tables is also deprecated. + - Support for legacy auth tables in the system_auth keyspace (users, + permissions, credentials) and the migration code has been removed. Migration + of these legacy auth tables must have been completed before the upgrade to + 4.0 and the legacy tables must have been removed. See the 'Upgrading' section + for version 2.2 for migration instructions. + - Cassandra 4.0 removed support for the deprecated Thrift interface. Amongst + other things, this implies the removal of all yaml options related to thrift + ('start_rpc', rpc_port, ...). + - Cassandra 4.0 removed support for any pre-3.0 format. This means you + cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade to + a 3.0.x/3.x version first (and run upgradesstable). In particular, this + mean Cassandra 4.0 cannot load or read pre-3.0 sstables in any way: you + will need to upgrade those sstable in 3.0.x/3.x first. + - Upgrades from 3.0.x or 3.x are supported since 3.0.13 or 3.11.0, previous + versions will causes issues during rolling upgrades (CASSANDRA-13274). + - Cassandra will no longer allow invalid keyspace replication options, such + as invalid datacenter names for NetworkTopologyStrategy. Operators MUST + add new nodes to a datacenter before they can set set ALTER or CREATE + keyspace replication policies using that datacenter. Existing keyspaces + will continue to operate, but CREATE and ALTER will validate that all + datacenters specified exist in the cluster. + - Cassandra 4.0 fixes a problem with incremental repair which caused repaired + data to be inconsistent between nodes. The fix changes the behavior of both + full and incremental repairs. For full repairs, data is no longer marked + repaired. For incremental repairs, anticompaction is run at the beginning + of the repair, instead of at the end. If incremental repair was being used + prior to upgrading, a full repair should be run after upgrading to resolve + any inconsistencies. + - Config option index_interval has been removed (it was deprecated since 2.0) + - Deprecated repair JMX APIs are removed. + - The version of snappy-java has been upgraded to 1.1.2.6 + - the miniumum value for internode message timeouts is 10ms. Previously, any + positive value was allowed. See cassandra.yaml entries like + read_request_timeout_in_ms for more details. + - Cassandra 4.0 allows a single port to be used for both secure and insecure + connections between cassandra nodes (CASSANDRA-10404). See the yaml for + specific property changes, and see the security doc for full details. + - Due to the parallelization of the initial build of materialized views, + the per token range view building status is stored in the new table + `system.view_builds_in_progress`. The old table `system.views_builds_in_progress` + is no longer used and can be removed. See CASSANDRA-12245 for more details. + - Config option commitlog_sync_batch_window_in_ms has been deprecated as it's + documentation has been incorrect and the setting itself near useless. + Batch mode remains a valid commit log mode, however. + - There is a new commit log mode, group, which is similar to batch mode + but blocks for up to a configurable number of milliseconds between disk flushes. + - nodetool clearsnapshot now required the --all flag to remove all snapshots. + Previous behavior would delete all snapshots by default. + - Nodes are now identified by a combination of IP, and storage port. + Existing JMX APIs, nodetool, and system tables continue to work + and accept/return just an IP, but there is a new + version of each that works with the full unambiguous identifier. + You should prefer these over the deprecated ambiguous versions that only + work with an IP. This was done to support multiple instances per IP. + Additionally we are moving to only using a single port for encrypted and + unencrypted traffic and if you want multiple instances per IP you must + first switch encrypted traffic to the storage port and not a separate + encrypted port. If you want to use multiple instances per IP + with SSL you will need to use StartTLS on storage_port and set + outgoing_encrypted_port_source to gossip outbound connections + know what port to connect to for each instance. Before changing + storage port or native port at nodes you must first upgrade the entire cluster + and clients to 4.0 so they can handle the port not being consistent across + the cluster. + - Names of AWS regions/availability zones have been cleaned up to more correctly + match the Amazon names. There is now a new option in conf/cassandra-rackdc.properties + that lets users enable the correct names for new clusters, or use the legacy + names for existing clusters. See conf/cassandra-rackdc.properties for details. + - Background repair has been removed. dclocal_read_repair_chance and + read_repair_chance table options have been removed and are now rejected. + See CASSANDRA-13910 for details. + - Internode TCP connections that do not ack segments for 30s will now + be automatically detected and closed via the Linux TCP_USER_TIMEOUT + socket option. This should be exceedingly rare, but AWS networks (and + other stateful firewalls) apparently suffer from this issue. You can + tune the timeouts on TCP connection and segment ack via the + `cassandra.yaml:internode_tcp_connect_timeout_in_ms` and + `cassandra.yaml:internode_tcp_user_timeout_in_ms` options respectively. + See CASSANDRA-14358 for details. + - repair_session_space_in_mb setting has been added to cassandra.yaml to allow operators to reduce + merkle tree size if repair is creating too much heap pressure. The repair_session_max_tree_depth + setting added in 3.0.19 and 3.11.5 is deprecated in favor of this setting. See CASSANDRA-14096 + - The flags 'enable_materialized_views' and 'enable_sasi_indexes' in cassandra.yaml + have been set as false by default. Operators should modify them to allow the + creation of new views and SASI indexes, the existing ones will continue working. + See CASSANDRA-14866 for details. + - CASSANDRA-15216 - The flag 'cross_node_timeout' has been set as true by default. + This change is done under the assumption that users have setup NTP on + their clusters or otherwise synchronize their clocks, and that clocks are + mostly in sync, since this is a requirement for general correctness of + last write wins. + - CASSANDRA-15257 removed the joda time dependency. Any time formats + passed will now need to conform to java.time.format.DateTimeFormatter. + Most notably, days and months must be two digits, and years exceeding + four digits need to be prefixed with a plus or minus sign. + - cqlsh now returns a non-zero code in case of errors. This is a backward incompatible change so it may + break existing scripts that rely on the current behavior. See CASSANDRA-15623 for more details. + - Updated the default compaction_throughput_mb_per_sec to to 64. The original + default (16) was meant for spinning disk volumes. See CASSANDRA-14902 for details. ++ - Custom compaction strategies must now handle getting sstables added/removed notifications for ++ sstables already added/removed - see CASSANDRA-14103 for details. + + +Deprecation +----------- + + - The JMX MBean org.apache.cassandra.db:type=BlacklistedDirectories has been + deprecated in favor of org.apache.cassandra.db:type=DisallowedDirectories + and will be removed in a subsequent major version. + + +Materialized Views +------------------- + - Following a discussion regarding concerns about the design and safety of Materialized Views, the C* development + community no longer recommends them for production use, and considers them experimental. Warnings messages will + now be logged when they are created. (See https://www.mail-archive.com/dev@cassandra.apache.org/msg11511.html) + - An 'enable_materialized_views' flag has been added to cassandra.yaml to allow operators to prevent creation of + views + - CREATE MATERIALIZED VIEW syntax has become stricter. Partition key columns are no longer implicitly considered + to be NOT NULL, and no base primary key columns get automatically included in view definition. You have to + specify them explicitly now. 3.11.6 ====== diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index c764d6e,4298be8..0b37c22 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@@ -273,33 -336,11 +284,39 @@@ public abstract class AbstractCompactio addSSTable(sstable); } + /** + * Removes sstable from the strategy, implementations must be able to handle the sstable having already been removed. + */ public abstract void removeSSTable(SSTableReader sstable); ++ /** ++ * Removes sstables from the strategy, implementations must be able to handle the sstables having already been removed. ++ */ + public void removeSSTables(Iterable<SSTableReader> removed) + { + for (SSTableReader sstable : removed) + removeSSTable(sstable); + } + + /** + * Returns the sstables managed by this strategy instance + */ + @VisibleForTesting + protected abstract Set<SSTableReader> getSSTables(); + + /** + * Called when the metadata has changed for an sstable - for example if the level changed + * + * Not called when repair status changes (which is also metadata), because this results in the + * sstable getting removed from the compaction strategy instance. + * + * @param oldMetadata + * @param sstable + */ + public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable) + { + } + public static class ScannerList implements AutoCloseable { public final List<ISSTableScanner> scanners; diff --cc src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java index 63b7909,0000000..95fc7b8 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java @@@ -1,210 -1,0 +1,210 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.function.Supplier; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.schema.CompactionParams; + +/** + * Wrapper that's aware of how sstables are divided between separate strategies, + * and provides a standard interface to them + * + * not threadsafe, calls must be synchronized by caller + */ +public abstract class AbstractStrategyHolder +{ + public static class TaskSupplier implements Comparable<TaskSupplier> + { + private final int numRemaining; + private final Supplier<AbstractCompactionTask> supplier; + + TaskSupplier(int numRemaining, Supplier<AbstractCompactionTask> supplier) + { + this.numRemaining = numRemaining; + this.supplier = supplier; + } + + public AbstractCompactionTask getTask() + { + return supplier.get(); + } + + public int compareTo(TaskSupplier o) + { + return o.numRemaining - numRemaining; + } + } + + public static interface DestinationRouter + { + int getIndexForSSTable(SSTableReader sstable); + int getIndexForSSTableDirectory(Descriptor descriptor); + } + + /** + * Maps sstables to their token partition bucket + */ - static class GroupedSSTableContainer ++ public static class GroupedSSTableContainer + { + private final AbstractStrategyHolder holder; + private final Set<SSTableReader>[] groups; + + private GroupedSSTableContainer(AbstractStrategyHolder holder) + { + this.holder = holder; + Preconditions.checkArgument(holder.numTokenPartitions > 0, "numTokenPartitions not set"); + groups = new Set[holder.numTokenPartitions]; + } + + void add(SSTableReader sstable) + { + Preconditions.checkArgument(holder.managesSSTable(sstable), "this strategy holder doesn't manage %s", sstable); + int idx = holder.router.getIndexForSSTable(sstable); + Preconditions.checkState(idx >= 0 && idx < holder.numTokenPartitions, "Invalid sstable index (%s) for %s", idx, sstable); + if (groups[idx] == null) + groups[idx] = new HashSet<>(); + groups[idx].add(sstable); + } + - int numGroups() ++ public int numGroups() + { + return groups.length; + } + - Set<SSTableReader> getGroup(int i) ++ public Set<SSTableReader> getGroup(int i) + { + Preconditions.checkArgument(i >= 0 && i < groups.length); + Set<SSTableReader> group = groups[i]; + return group != null ? group : Collections.emptySet(); + } + + boolean isGroupEmpty(int i) + { + return getGroup(i).isEmpty(); + } + + boolean isEmpty() + { + for (int i = 0; i < groups.length; i++) + if (!isGroupEmpty(i)) + return false; + return true; + } + } + + protected final ColumnFamilyStore cfs; + final DestinationRouter router; + private int numTokenPartitions = -1; + + AbstractStrategyHolder(ColumnFamilyStore cfs, DestinationRouter router) + { + this.cfs = cfs; + this.router = router; + } + + public abstract void startup(); + + public abstract void shutdown(); + + final void setStrategy(CompactionParams params, int numTokenPartitions) + { + Preconditions.checkArgument(numTokenPartitions > 0, "at least one token partition required"); + shutdown(); + this.numTokenPartitions = numTokenPartitions; + setStrategyInternal(params, numTokenPartitions); + } + + protected abstract void setStrategyInternal(CompactionParams params, int numTokenPartitions); + + /** + * SSTables are grouped by their repaired and pending repair status. This method determines if this holder + * holds the sstable for the given repaired/grouped statuses. Holders should be mutually exclusive in the + * groups they deal with. IOW, if one holder returns true for a given isRepaired/isPendingRepair combo, + * none of the others should. + */ + public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient); + + public boolean managesSSTable(SSTableReader sstable) + { + return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair(), sstable.isTransient()); + } + + public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader sstable); + + public abstract Iterable<AbstractCompactionStrategy> allStrategies(); + + public abstract Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore); + + public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput); + + public abstract Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore); + + public GroupedSSTableContainer createGroupedSSTableContainer() + { + return new GroupedSSTableContainer(this); + } + + public abstract void addSSTables(GroupedSSTableContainer sstables); + + public abstract void removeSSTables(GroupedSSTableContainer sstables); + + public abstract void replaceSSTables(GroupedSSTableContainer removed, GroupedSSTableContainer added); + + public abstract List<ISSTableScanner> getScanners(GroupedSSTableContainer sstables, Collection<Range<Token>> ranges); + + + public abstract SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + UUID pendingRepair, + boolean isTransient, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, + LifecycleNewTracker lifecycleNewTracker); + + /** + * Return the directory index the given compaction strategy belongs to, or -1 + * if it's not held by this holder + */ + public abstract int getStrategyIndex(AbstractCompactionStrategy strategy); + + public abstract boolean containsSSTable(SSTableReader sstable); +} diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 2e1fd0e,c80504c..f3c9a66 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -33,15 -23,13 +33,12 @@@ import java.util.UUID import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.primitives.Ints; - -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; - - import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask; - import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -49,8 -37,8 +46,10 @@@ import org.apache.cassandra.config.Data import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.DiskBoundaries; -import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier; ++import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.dht.Range; @@@ -135,10 -102,12 +134,12 @@@ public class CompactionStrategyManager If a user changes the local compaction strategy and then later ALTERs a compaction parameter, we will use the new compaction parameters. - **/ + */ private volatile CompactionParams schemaCompactionParams; - private boolean supportsEarlyOpen; - private int fanout; + private volatile boolean supportsEarlyOpen; + private volatile int fanout; + private volatile long maxSSTableSizeBytes; + private volatile String name; public CompactionStrategyManager(ColumnFamilyStore cfs) { @@@ -310,9 -215,12 +311,11 @@@ if (sstable.openReason != SSTableReader.OpenReason.EARLY) compactionStrategyFor(sstable).addSSTable(sstable); } - repaired.forEach(AbstractCompactionStrategy::startup); - unrepaired.forEach(AbstractCompactionStrategy::startup); - supportsEarlyOpen = repaired.get(0).supportsEarlyOpen(); - fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE; - name = repaired.get(0).getName(); - maxSSTableSizeBytes = repaired.get(0).getMaxSSTableBytes(); + holders.forEach(AbstractStrategyHolder::startup); + supportsEarlyOpen = repaired.first().supportsEarlyOpen(); + fanout = (repaired.first() instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.first()).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE; ++ maxSSTableSizeBytes = repaired.first().getMaxSSTableBytes(); ++ name = repaired.first().getName(); } finally { @@@ -362,10 -275,9 +365,9 @@@ * @param sstable * @return */ - @VisibleForTesting int compactionStrategyIndexFor(SSTableReader sstable) { - // should not call maybeReload because it may be called from within lock + // should not call maybeReloadDiskBoundaries because it may be called from within lock readLock.lock(); try { @@@ -568,10 -434,10 +569,10 @@@ readLock.lock(); try { - if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) + if (repaired.first() instanceof LeveledCompactionStrategy) { - int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; + int[] res = new int[LeveledGenerations.MAX_LEVEL_COUNT]; - for (AbstractCompactionStrategy strategy : repaired) + for (AbstractCompactionStrategy strategy : getAllStrategies()) { int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); res = sumArrays(res, repairedCountPerLevel); @@@ -601,201 -472,135 +602,175 @@@ return res; } - public Directories getDirectories() - { - maybeReloadDiskBoundaries(); - readLock.lock(); - try - { - assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); - return repaired.get(0).getDirectories(); - } - finally - { - readLock.unlock(); - } - } - + /** + * Should only be called holding the readLock + */ private void handleFlushNotification(Iterable<SSTableReader> added) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - - readLock.lock(); - try - { - for (SSTableReader sstable : added) - compactionStrategyFor(sstable).addSSTable(sstable); - } - finally - { - readLock.unlock(); - } + for (SSTableReader sstable : added) + compactionStrategyFor(sstable).addSSTable(sstable); } - /** - * Should only be called holding the readLock - */ - private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) + private int getHolderIndex(SSTableReader sstable) { - // a bit of gymnastics to be able to replace sstables in compaction strategies - // we use this to know that a compaction finished and where to start the next compaction in LCS - int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1; + for (int i = 0; i < holders.size(); i++) + { + if (holders.get(i).managesSSTable(sstable)) + return i; + } - List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); - List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); - List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); - List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); + throw new IllegalStateException("No holder claimed " + sstable); + } - for (int i = 0; i < locationSize; i++) + private AbstractStrategyHolder getHolder(SSTableReader sstable) + { + for (AbstractStrategyHolder holder : holders) { - repairedRemoved.add(new HashSet<>()); - repairedAdded.add(new HashSet<>()); - unrepairedRemoved.add(new HashSet<>()); - unrepairedAdded.add(new HashSet<>()); + if (holder.managesSSTable(sstable)) + return holder; } - for (SSTableReader sstable : removed) + throw new IllegalStateException("No holder claimed " + sstable); + } + + private AbstractStrategyHolder getHolder(long repairedAt, UUID pendingRepair, boolean isTransient) + { + return getHolder(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE, + pendingRepair != ActiveRepairService.NO_PENDING_REPAIR, + isTransient); + } + + @VisibleForTesting + AbstractStrategyHolder getHolder(boolean isRepaired, boolean isPendingRepair, boolean isTransient) + { + for (AbstractStrategyHolder holder : holders) { - int i = compactionStrategyIndexFor(sstable); - if (sstable.isRepaired()) - repairedRemoved.get(i).add(sstable); - else - unrepairedRemoved.get(i).add(sstable); + if (holder.managesRepairedGroup(isRepaired, isPendingRepair, isTransient)) + return holder; } - for (SSTableReader sstable : added) + + throw new IllegalStateException(String.format("No holder claimed isPendingRepair: %s, isPendingRepair %s", + isRepaired, isPendingRepair)); + } + + @VisibleForTesting + ImmutableList<AbstractStrategyHolder> getHolders() + { + return holders; + } + + /** + * Split sstables into a list of grouped sstable containers, the list index an sstable + * + * lives in matches the list index of the holder that's responsible for it + */ - @VisibleForTesting - List<GroupedSSTableContainer> groupSSTables(Iterable<SSTableReader> sstables) ++ public List<GroupedSSTableContainer> groupSSTables(Iterable<SSTableReader> sstables) + { + List<GroupedSSTableContainer> classified = new ArrayList<>(holders.size()); + for (AbstractStrategyHolder holder : holders) { - int i = compactionStrategyIndexFor(sstable); - if (sstable.isRepaired()) - repairedAdded.get(i).add(sstable); - else - unrepairedAdded.get(i).add(sstable); + classified.add(holder.createGroupedSSTableContainer()); } - for (int i = 0; i < locationSize; i++) + + for (SSTableReader sstable : sstables) { - if (!repairedRemoved.get(i).isEmpty()) - repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i)); - else - repaired.get(i).addSSTables(repairedAdded.get(i)); + classified.get(getHolderIndex(sstable)).add(sstable); + } - if (!unrepairedRemoved.get(i).isEmpty()) - unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i)); - else - unrepaired.get(i).addSSTables(unrepairedAdded.get(i)); + return classified; + } + ++ /** ++ * Should only be called holding the readLock ++ */ + private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) + { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - - readLock.lock(); - try ++ List<GroupedSSTableContainer> addedGroups = groupSSTables(added); ++ List<GroupedSSTableContainer> removedGroups = groupSSTables(removed); ++ for (int i=0; i<holders.size(); i++) + { - List<GroupedSSTableContainer> addedGroups = groupSSTables(added); - List<GroupedSSTableContainer> removedGroups = groupSSTables(removed); - for (int i=0; i<holders.size(); i++) - { - holders.get(i).replaceSSTables(removedGroups.get(i), addedGroups.get(i)); - } - } - finally - { - readLock.unlock(); ++ holders.get(i).replaceSSTables(removedGroups.get(i), addedGroups.get(i)); } } ++ /** ++ * Should only be called holding the readLock ++ */ private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - // we need a write lock here since we move sstables from one strategy instance to another - readLock.lock(); - try - for (SSTableReader sstable : sstables) ++ List<GroupedSSTableContainer> groups = groupSSTables(sstables); ++ for (int i = 0; i < holders.size(); i++) { - List<GroupedSSTableContainer> groups = groupSSTables(sstables); - for (int i = 0; i < holders.size(); i++) - int index = compactionStrategyIndexFor(sstable); - if (sstable.isRepaired()) -- { - GroupedSSTableContainer group = groups.get(i); - unrepaired.get(index).removeSSTable(sstable); - repaired.get(index).addSSTable(sstable); - } - else ++ GroupedSSTableContainer group = groups.get(i); + - if (group.isEmpty()) - continue; ++ if (group.isEmpty()) ++ continue; + - AbstractStrategyHolder dstHolder = holders.get(i); - - for (AbstractStrategyHolder holder : holders) - { - if (holder != dstHolder) - holder.removeSSTables(group); - } - - // adding sstables into another strategy may change its level, - // thus it won't be removed from original LCS. We have to remove sstables first - dstHolder.addSSTables(group); ++ AbstractStrategyHolder dstHolder = holders.get(i); ++ for (AbstractStrategyHolder holder : holders) + { - repaired.get(index).removeSSTable(sstable); - unrepaired.get(index).addSSTable(sstable); ++ if (holder != dstHolder) ++ holder.removeSSTables(group); } - } - finally - { - readLock.unlock(); ++ ++ // adding sstables into another strategy may change its level, ++ // thus it won't be removed from original LCS. We have to remove sstables first ++ dstHolder.addSSTables(group); } + } ++ /** ++ * Should only be called holding the readLock ++ */ + private void handleMetadataChangedNotification(SSTableReader sstable, StatsMetadata oldMetadata) + { + AbstractCompactionStrategy acs = getCompactionStrategyFor(sstable); + acs.metadataChanged(oldMetadata, sstable); } ++ /** ++ * Should only be called holding the readLock ++ */ private void handleDeletingNotification(SSTableReader deleted) { - // If reloaded, SSTables will be placed in their correct locations - // so there is no need to process notification - if (maybeReloadDiskBoundaries()) - return; - readLock.lock(); - try - { - compactionStrategyFor(deleted).removeSSTable(deleted); - } - finally - { - readLock.unlock(); - } + compactionStrategyFor(deleted).removeSSTable(deleted); } public void handleNotification(INotification notification, Object sender) { - if (notification instanceof SSTableAddedNotification) - { - handleFlushNotification(((SSTableAddedNotification) notification).added); - } - else if (notification instanceof SSTableListChangedNotification) - { - SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; - handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed); - } - else if (notification instanceof SSTableRepairStatusChanged) - { - handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables); - } - else if (notification instanceof SSTableDeletingNotification) + // we might race with reload adding/removing the sstables, this means that compaction strategies + // must handle double notifications. + maybeReloadDiskBoundaries(); + readLock.lock(); + try { - handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); ++ + if (notification instanceof SSTableAddedNotification) + { + handleFlushNotification(((SSTableAddedNotification) notification).added); + } + else if (notification instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed); + } + else if (notification instanceof SSTableRepairStatusChanged) + { + handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables); + } + else if (notification instanceof SSTableDeletingNotification) + { + handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); + } ++ else if (notification instanceof SSTableMetadataChanged) ++ { ++ SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) notification; ++ handleMetadataChangedNotification(lcNotification.sstable, lcNotification.oldMetadata); ++ } } - else if (notification instanceof SSTableMetadataChanged) + finally { - SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) notification; - handleMetadataChangedNotification(lcNotification.sstable, lcNotification.oldMetadata); + readLock.unlock(); } } @@@ -835,45 -649,47 +810,45 @@@ * @return */ @SuppressWarnings("resource") - public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) + public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { maybeReloadDiskBoundaries(); - readLock.lock(); + List<ISSTableScanner> scanners = new ArrayList<>(sstables.size()); + readLock.lock(); try { - assert repaired.size() == unrepaired.size(); - List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(); - List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>(); + List<GroupedSSTableContainer> sstableGroups = groupSSTables(sstables); - for (int i = 0; i < repaired.size(); i++) + for (int i = 0; i < holders.size(); i++) { - repairedSSTables.add(new HashSet<>()); - unrepairedSSTables.add(new HashSet<>()); - } - - for (SSTableReader sstable : sstables) - { - if (sstable.isRepaired()) - repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable); - else - unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable); + AbstractStrategyHolder holder = holders.get(i); + GroupedSSTableContainer group = sstableGroups.get(i); + scanners.addAll(holder.getScanners(group, ranges)); } + } + catch (PendingRepairManager.IllegalSSTableArgumentException e) + { + ISSTableScanner.closeAllAndPropagate(scanners, new ConcurrentModificationException(e)); + } + finally + { + readLock.unlock(); + } + return new AbstractCompactionStrategy.ScannerList(scanners); + } - List<ISSTableScanner> scanners = new ArrayList<>(sstables.size()); - for (int i = 0; i < repairedSSTables.size(); i++) + public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) + { + while (true) + { + try { - if (!repairedSSTables.get(i).isEmpty()) - scanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), ranges).scanners); + return maybeGetScanners(sstables, ranges); } - for (int i = 0; i < unrepairedSSTables.size(); i++) + catch (ConcurrentModificationException e) { - if (!unrepairedSSTables.get(i).isEmpty()) - scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), ranges).scanners); + logger.debug("SSTable repairedAt/pendingRepaired values changed while getting scanners"); } - - return new AbstractCompactionStrategy.ScannerList(scanners); - } - finally - { - readLock.unlock(); } } diff --cc src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 74ffccb,77cb223..dd7c9df --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@@ -21,7 -21,7 +21,6 @@@ import java.util.* import com.google.common.annotations.VisibleForTesting; --import com.google.common.base.Joiner; import com.google.common.collect.*; import com.google.common.primitives.Doubles; @@@ -348,16 -340,15 +347,22 @@@ public class LeveledCompactionStrategy } @Override + public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable) + { + if (sstable.getSSTableLevel() != oldMetadata.sstableLevel) + manifest.newLevel(sstable, oldMetadata.sstableLevel); + } + + @Override + public void addSSTables(Iterable<SSTableReader> sstables) + { + manifest.addSSTables(sstables); + } + + @Override public void addSSTable(SSTableReader added) { - manifest.add(added); + manifest.addSSTables(Collections.singleton(added)); } @Override diff --cc src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java index 0000000,f7087f0..bc83275 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java @@@ -1,0 -1,311 +1,310 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.compaction; + + import java.io.IOException; + import java.util.Collection; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashSet; + import java.util.Iterator; + import java.util.Set; + import java.util.TreeSet; + import java.util.concurrent.TimeUnit; + + import com.google.common.collect.ImmutableSet; + import com.google.common.collect.Iterators; + import com.google.common.collect.PeekingIterator; + import com.google.common.primitives.Ints; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.cassandra.config.Config; + import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.utils.FBUtilities; + + /** + * Handles the leveled manifest generations + * + * Not thread safe, all access should be synchronized in LeveledManifest + */ + class LeveledGenerations + { + private static final Logger logger = LoggerFactory.getLogger(LeveledGenerations.class); + private final boolean strictLCSChecksTest = Boolean.getBoolean(Config.PROPERTY_PREFIX + "test.strict_lcs_checks"); + // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is + // updated, we will still have sstables of the older, potentially smaller size. So don't make this + // dependent on maxSSTableSize.) + static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000); + + private final Set<SSTableReader> l0 = new HashSet<>(); + private static long lastOverlapCheck = System.nanoTime(); + // note that since l0 is broken out, levels[0] represents L1: + private final TreeSet<SSTableReader> [] levels = new TreeSet[MAX_LEVEL_COUNT - 1]; + + private static final Comparator<SSTableReader> nonL0Comparator = (o1, o2) -> { + int cmp = SSTableReader.sstableComparator.compare(o1, o2); + if (cmp == 0) + cmp = Ints.compare(o1.descriptor.generation, o2.descriptor.generation); + return cmp; + }; + + LeveledGenerations() + { + for (int i = 0; i < MAX_LEVEL_COUNT - 1; i++) + levels[i] = new TreeSet<>(nonL0Comparator); + } + + Set<SSTableReader> get(int level) + { + if (level > levelCount() - 1 || level < 0) + throw new ArrayIndexOutOfBoundsException("Invalid generation " + level + " - maximum is " + (levelCount() - 1)); + if (level == 0) + return l0; + return levels[level - 1]; + } + + int levelCount() + { + return levels.length + 1; + } + + /** + * Adds readers to the correct level + * + * If adding an sstable would cause an overlap in the level (if level > 1) we send it to L0. This can happen + * for example when moving sstables from unrepaired to repaired. + * + * If the sstable is already in the manifest we skip it. + * + * If the sstable exists in the manifest but has the wrong level, it is removed from the wrong level and added to the correct one + * + * todo: group sstables per level, add all if level is currently empty, improve startup speed + */ + void addAll(Iterable<SSTableReader> readers) + { + logDistribution(); + for (SSTableReader sstable : readers) + { + assert sstable.getSSTableLevel() < levelCount() : "Invalid level " + sstable.getSSTableLevel() + " out of " + (levelCount() - 1); + int existingLevel = getLevelIfExists(sstable); + if (existingLevel != -1) + { + if (sstable.getSSTableLevel() != existingLevel) + { + logger.error("SSTable {} on the wrong level in the manifest - {} instead of {} as recorded in the sstable metadata, removing from level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel); + if (strictLCSChecksTest) + throw new AssertionError("SSTable not in matching level in manifest: "+sstable + ": "+existingLevel+" != " + sstable.getSSTableLevel()); + get(existingLevel).remove(sstable); + } + else + { + logger.info("Manifest already contains {} in level {} - skipping", sstable, existingLevel); + continue; + } + } + + if (sstable.getSSTableLevel() == 0) + { + l0.add(sstable); + continue; + } + + TreeSet<SSTableReader> level = levels[sstable.getSSTableLevel() - 1]; + /* + current level: |-----||----||----| |---||---| + new sstable: |--| + ^ before + ^ after + overlap if before.last >= newsstable.first or after.first <= newsstable.last + */ + SSTableReader after = level.ceiling(sstable); + SSTableReader before = level.floor(sstable); + + if (before != null && before.last.compareTo(sstable.first) >= 0 || + after != null && after.first.compareTo(sstable.last) <= 0) + { + if (strictLCSChecksTest) // we can only assert this in tests since this is normal when for example moving sstables from unrepaired to repaired + throw new AssertionError("Got unexpected overlap in level "+sstable.getSSTableLevel()); + sendToL0(sstable); + } + else + { + level.add(sstable); + } + } + maybeVerifyLevels(); + } + + /** + * Sends sstable to L0 by mutating its level in the sstable metadata. + * + * SSTable should not exist in the manifest + */ + private void sendToL0(SSTableReader sstable) + { + try + { - sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0); - sstable.reloadSSTableMetadata(); ++ sstable.mutateLevelAndReload(0); + } + catch (IOException e) + { + // Adding it to L0 and marking suspect is probably the best we can do here - it won't create overlap + // and we won't pick it for later compactions. + logger.error("Failed mutating sstable metadata for {} - adding it to L0 to avoid overlap. Marking suspect", sstable, e); + sstable.markSuspect(); + } + l0.add(sstable); + } + + /** + * Tries to find the sstable in the levels without using the sstable-recorded level + * + * Used to make sure we don't try to re-add an existing sstable + */ + private int getLevelIfExists(SSTableReader sstable) + { + for (int i = 0; i < levelCount(); i++) + { + if (get(i).contains(sstable)) + return i; + } + return -1; + } + + int remove(Collection<SSTableReader> readers) + { + int minLevel = Integer.MAX_VALUE; + for (SSTableReader sstable : readers) + { + int level = sstable.getSSTableLevel(); + minLevel = Math.min(minLevel, level); + get(level).remove(sstable); + } + return minLevel; + } + + int[] getAllLevelSize() + { + int[] counts = new int[levelCount()]; + for (int i = 0; i < levelCount(); i++) + counts[i] = get(i).size(); + return counts; + } + + Set<SSTableReader> allSSTables() + { + ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder(); + builder.addAll(l0); + for (Set<SSTableReader> sstables : levels) + builder.addAll(sstables); + return builder.build(); + } + + /** + * given a level with sstables with first tokens [0, 10, 20, 30] and a lastCompactedSSTable with last = 15, we will + * return an Iterator over [20, 30, 0, 10]. + */ + Iterator<SSTableReader> wrappingIterator(int lvl, SSTableReader lastCompactedSSTable) + { + assert lvl > 0; // only makes sense in L1+ + TreeSet<SSTableReader> level = levels[lvl - 1]; + if (level.isEmpty()) + return Collections.emptyIterator(); + if (lastCompactedSSTable == null) + return level.iterator(); + + PeekingIterator<SSTableReader> tail = Iterators.peekingIterator(level.tailSet(lastCompactedSSTable).iterator()); + SSTableReader pivot = null; + // then we need to make sure that the first token of the pivot is greater than the last token of the lastCompactedSSTable + while (tail.hasNext()) + { + SSTableReader potentialPivot = tail.peek(); + if (potentialPivot.first.compareTo(lastCompactedSSTable.last) > 0) + { + pivot = potentialPivot; + break; + } + tail.next(); + } + + if (pivot == null) + return level.iterator(); + + return Iterators.concat(tail, level.headSet(pivot, false).iterator()); + } + + void logDistribution() + { + if (logger.isTraceEnabled()) + { + for (int i = 0; i < levelCount(); i++) + { + Set<SSTableReader> level = get(i); + if (!level.isEmpty()) + { + logger.trace("L{} contains {} SSTables ({}) in {}", + i, + level.size(), + FBUtilities.prettyPrintMemory(SSTableReader.getTotalBytes(level)), + this); + } + } + } + } + + Set<SSTableReader>[] snapshot() + { + Set<SSTableReader> [] levelsCopy = new Set[levelCount()]; + for (int i = 0; i < levelCount(); i++) + levelsCopy[i] = ImmutableSet.copyOf(get(i)); + return levelsCopy; + } + + /** + * do extra verification of the sstables in the generations + * + * only used during tests + */ + private void maybeVerifyLevels() + { + if (!strictLCSChecksTest || System.nanoTime() - lastOverlapCheck <= TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS)) + return; + logger.info("LCS verifying levels"); + lastOverlapCheck = System.nanoTime(); + for (int i = 1; i < levelCount(); i++) + { + SSTableReader prev = null; + for (SSTableReader sstable : get(i)) + { + // no overlap: + assert prev == null || prev.last.compareTo(sstable.first) < 0; + prev = sstable; + // make sure it does not exist in any other level: + for (int j = 0; j < levelCount(); j++) + { + if (i == j) + continue; + assert !get(j).contains(sstable); + } + } + } + } + } diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 37c9435,d630730..4f70f64 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@@ -217,76 -151,9 +150,8 @@@ public class LeveledManifes if (logger.isTraceEnabled()) logger.trace("Adding [{}]", toString(added)); - - for (SSTableReader ssTableReader : added) - add(ssTableReader); - lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last; - } - - public synchronized void repairOverlappingSSTables(int level) - { - SSTableReader previous = null; - Collections.sort(generations[level], SSTableReader.sstableComparator); - List<SSTableReader> outOfOrderSSTables = new ArrayList<>(); - for (SSTableReader current : generations[level]) - { - if (previous != null && current.first.compareTo(previous.last) <= 0) - { - logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact that you have dropped sstables from another node into the data directory. " + - "Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable", - level, previous, previous.first, previous.last, current, current.first, current.last); - outOfOrderSSTables.add(current); - } - else - { - previous = current; - } - } - - if (!outOfOrderSSTables.isEmpty()) - { - for (SSTableReader sstable : outOfOrderSSTables) - sendBackToL0(sstable); - } - } - - /** - * Checks if adding the sstable creates an overlap in the level - * @param sstable the sstable to add - * @return true if it is safe to add the sstable in the level. - */ - private boolean canAddSSTable(SSTableReader sstable) - { - int level = sstable.getSSTableLevel(); - if (level == 0) - return true; - - List<SSTableReader> copyLevel = new ArrayList<>(generations[level]); - copyLevel.add(sstable); - Collections.sort(copyLevel, SSTableReader.sstableComparator); - - SSTableReader previous = null; - for (SSTableReader current : copyLevel) - { - if (previous != null && current.first.compareTo(previous.last) <= 0) - return false; - previous = current; - } - return true; - } -- - private synchronized void sendBackToL0(SSTableReader sstable) - { - remove(sstable); - try - { - sstable.mutateLevelAndReload(0); - add(sstable); - } - catch (IOException e) - { - throw new RuntimeException("Could not reload sstable meta data", e); - } + generations.addAll(added); + lastCompactedSSTables[minLevel] = SSTableReader.sstableOrdering.max(added); } private String toString(Collection<SSTableReader> sstables) @@@ -364,14 -231,9 +229,14 @@@ // This isn't a magic wand -- if you are consistently writing too fast for LCS to keep // up, you're still screwed. But if instead you have intermittent bursts of activity, // it can help a lot. + + // Let's check that L0 is far enough behind to warrant STCS. + // If it is, it will be used before proceeding any of higher level + CompactionCandidate l0Compaction = getSTCSInL0CompactionCandidate(); + - for (int i = generations.length - 1; i > 0; i--) + for (int i = generations.levelCount() - 1; i > 0; i--) { - List<SSTableReader> sstables = getLevel(i); + Set<SSTableReader> sstables = generations.get(i); if (sstables.isEmpty()) continue; // mostly this just avoids polluting the debug log with zero scores // we want to calculate score excluding compacting ones @@@ -394,7 -257,7 +259,7 @@@ candidates = getOverlappingStarvedSSTables(nextLevel, candidates); if (logger.isTraceEnabled()) logger.trace("Compaction candidates for L{} are {}", i, toString(candidates)); -- return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategyManager().getMaxSSTableBytes()); ++ return new CompactionCandidate(candidates, nextLevel, maxSSTableSizeInBytes); } else { @@@ -764,9 -579,9 +582,7 @@@ @VisibleForTesting List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates) { - List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates); - Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampAscending); - return ageSortedCandidates; - List<SSTableReader> copy = new ArrayList<>(candidates); - copy.sort(SSTableReader.maxTimestampAscending); - return ImmutableList.copyOf(copy); ++ return ImmutableList.sortedCopyOf(SSTableReader.maxTimestampAscending, candidates); } public synchronized Set<SSTableReader>[] getSStablesPerLevelSnapshot() @@@ -818,13 -618,6 +619,13 @@@ tasks += estimated[i]; } - if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold()) ++ if (!DatabaseDescriptor.getDisableSTCSInL0() && generations.get(0).size() > cfs.getMaximumCompactionThreshold()) + { - int l0compactions = getLevel(0).size() / cfs.getMaximumCompactionThreshold(); ++ int l0compactions = generations.get(0).size() / cfs.getMaximumCompactionThreshold(); + tasks += l0compactions; + estimated[0] += l0compactions; + } + logger.trace("Estimating {} compactions to do for {}.{}", Arrays.toString(estimated), cfs.keyspace.getName(), cfs.name); return Ints.checkedCast(tasks); @@@ -851,25 -644,18 +652,26 @@@ assert newLevel > 0; } return newLevel; + } + synchronized Set<SSTableReader> getLevel(int level) + { + return ImmutableSet.copyOf(generations.get(level)); } - public Iterable<SSTableReader> getAllSSTables() + synchronized List<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator) { - Set<SSTableReader> sstables = new HashSet<>(); - for (List<SSTableReader> generation : generations) - { - sstables.addAll(generation); - } - return sstables; - List<SSTableReader> copy = new ArrayList<>(generations.get(level)); - copy.sort(comparator); - return ImmutableList.copyOf(copy); ++ return ImmutableList.sortedCopyOf(comparator, generations.get(level)); + } + - public synchronized void newLevel(SSTableReader sstable, int oldLevel) ++ synchronized void newLevel(SSTableReader sstable, int oldLevel) + { - boolean removed = generations[oldLevel].remove(sstable); - assert removed : "Could not remove " + sstable +" from " + oldLevel; - add(sstable); - lastCompactedKeys[oldLevel] = sstable.last; ++ boolean removed = generations.get(oldLevel).remove(sstable); ++ // if reload races with the metadataChanged notification the sstable might already be removed ++ if (!removed) ++ logger.warn("Could not remove "+sstable+" from "+oldLevel); ++ generations.addAll(Collections.singleton(sstable)); ++ lastCompactedSSTables[oldLevel] = sstable; } public static class CompactionCandidate diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java index d9d8db1,2a4f293..e2fedad --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@@ -245,30 -245,19 +245,16 @@@ public class StandaloneScrubbe if (strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class)) { int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L)); - - System.out.println("Checking leveled manifest"); - Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>() + int fanOut = cfs.getCompactionStrategyManager().getLevelFanoutSize(); - List<SSTableReader> repaired = new ArrayList<>(); - List<SSTableReader> unrepaired = new ArrayList<>(); - - for (SSTableReader sstable : sstables) ++ for (AbstractStrategyHolder.GroupedSSTableContainer sstableGroup : strategyManager.groupSSTables(sstables)) { - @Override - public boolean apply(SSTableReader sstable) - if (sstable.isRepaired()) - repaired.add(sstable); - else - unrepaired.add(sstable); ++ for (int i = 0; i < sstableGroup.numGroups(); i++) + { - return sstable.isRepaired(); ++ List<SSTableReader> groupSSTables = new ArrayList<>(sstableGroup.getGroup(i)); ++ // creating the manifest makes sure the leveling is sane: ++ LeveledManifest.create(cfs, maxSizeInMB, fanOut, groupSSTables); + } - }; - - List<SSTableReader> repaired = Lists.newArrayList(Iterables.filter(sstables, repairedPredicate)); - List<SSTableReader> unRepaired = Lists.newArrayList(Iterables.filter(sstables, Predicates.not(repairedPredicate))); - - LeveledManifest repairedManifest = LeveledManifest.create(cfs, maxSizeInMB, cfs.getLevelFanoutSize(), repaired); - for (int i = 1; i < repairedManifest.getLevelCount(); i++) - { - repairedManifest.repairOverlappingSSTables(i); - } - LeveledManifest unRepairedManifest = LeveledManifest.create(cfs, maxSizeInMB, cfs.getLevelFanoutSize(), unRepaired); - for (int i = 1; i < unRepairedManifest.getLevelCount(); i++) - { - unRepairedManifest.repairOverlappingSSTables(i); } - LeveledManifest.create(cfs, maxSizeInMB, fanOut, repaired); - LeveledManifest.create(cfs, maxSizeInMB, fanOut, unrepaired); } } diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 8a8ed13,1a3ac44..a7a5396 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@@ -29,8 -29,9 +30,9 @@@ import java.util.List import java.util.Map; import java.util.Random; import java.util.UUID; + import java.util.stream.Collectors; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@@ -365,26 -353,24 +364,24 @@@ public class LeveledCompactionStrategyT // we only have unrepaired sstables: assertEquals(sstableCount, cfs.getLiveSSTables().size()); - SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0); - SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0); + SSTableReader sstable1 = unrepaired.manifest.getLevel(2).iterator().next(); + SSTableReader sstable2 = unrepaired.manifest.getLevel(1).iterator().next(); - sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor, System.currentTimeMillis()); + sstable1.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable1.descriptor, System.currentTimeMillis(), null, false); sstable1.reloadSSTableMetadata(); assertTrue(sstable1.isRepaired()); manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this); - int repairedSSTableCount = 0; - for (List<SSTableReader> level : repaired.manifest.generations) - repairedSSTableCount += level.size(); + int repairedSSTableCount = repaired.manifest.getSSTables().size(); assertEquals(1, repairedSSTableCount); // make sure the repaired sstable ends up in the same level in the repaired manifest: - assertTrue(repaired.manifest.generations[2].contains(sstable1)); + assertTrue(repaired.manifest.getLevel(2).contains(sstable1)); // and that it is gone from unrepaired - assertFalse(unrepaired.manifest.generations[2].contains(sstable1)); + assertFalse(unrepaired.manifest.getLevel(2).contains(sstable1)); unrepaired.removeSSTable(sstable2); - manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this); + manager.handleNotification(new SSTableAddedNotification(singleton(sstable2), null), this); assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2)); assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } @@@ -548,68 -482,216 +545,282 @@@ } @Test + public void testDisableSTCSInL0() throws IOException + { + /* + First creates a bunch of sstables in L1, then overloads L0 with 50 sstables. Now with STCS in L0 enabled + we should get a compaction task where the target level is 0, then we disable STCS-in-L0 and make sure that + the compaction task we get targets L1 or higher. + */ + ColumnFamilyStore cfs = MockSchema.newCFS(); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class", "LeveledCompactionStrategy"); + localOptions.put("sstable_size_in_mb", "1"); + cfs.setCompactionParameters(localOptions); + List<SSTableReader> sstables = new ArrayList<>(); + for (int i = 0; i < 11; i++) + { + SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, cfs); + l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 1); + l1sstable.reloadSSTableMetadata(); + sstables.add(l1sstable); + } + + for (int i = 100; i < 150; i++) + sstables.add(MockSchema.sstable(i, 1 * 1024 * 1024, cfs)); + + cfs.disableAutoCompaction(); + cfs.addSSTables(sstables); + assertEquals(0, getTaskLevel(cfs)); + + try + { + CompactionManager.instance.setDisableSTCSInL0(true); + assertTrue(getTaskLevel(cfs) > 0); + } + finally + { + CompactionManager.instance.setDisableSTCSInL0(false); + } + } + + private int getTaskLevel(ColumnFamilyStore cfs) + { + int level = -1; + for (List<AbstractCompactionStrategy> strategies : cfs.getCompactionStrategyManager().getStrategies()) + { + for (AbstractCompactionStrategy strategy : strategies) + { + AbstractCompactionTask task = strategy.getNextBackgroundTask(0); + if (task != null) + { + try + { + assertTrue(task instanceof LeveledCompactionTask); + LeveledCompactionTask lcsTask = (LeveledCompactionTask) task; + level = Math.max(level, lcsTask.getLevel()); + } + finally + { + task.transaction.abort(); + } + } + } + } + return level; + } ++ ++ @Test + public void testAddingOverlapping() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> currentLevel = new ArrayList<>(); + int gen = 1; + currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs)); + currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs)); + + lm.addSSTables(currentLevel); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5)); + + List<SSTableReader> newSSTables = new ArrayList<>(); + // this sstable last token is the same as the first token of L1 above, should get sent to L0: + newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs)); + lm.addSSTables(newSSTables); + assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3)); + assertEquals(0, newSSTables.get(0).getSSTableLevel()); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + + newSSTables.clear(); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs)); + newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs)); + lm.addSSTables(newSSTables); + List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 3)); + newL1.add(newSSTables.get(1)); + assertLevelsEqual(lm.getLevel(1), newL1); + newSSTables.remove(1); + assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 0)); + assertTrue(lm.getLevel(0).containsAll(newSSTables)); + } + + @Test + public void singleTokenSSTableTest() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> expectedL1 = new ArrayList<>(); + + int gen = 1; + // single sstable, single token (100) + expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs)); + lm.addSSTables(expectedL1); + + List<SSTableReader> expectedL0 = new ArrayList<>(); + + // should get moved to L0: + expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs)); + expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs)); + lm.addSSTables(expectedL0); + + assertLevelsEqual(expectedL0, lm.getLevel(0)); + assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 0)); + assertLevelsEqual(expectedL1, lm.getLevel(1)); + assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 1)); + + // should work: + expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs)); + expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs)); + lm.addSSTables(expectedL1.subList(1, expectedL1.size())); + assertLevelsEqual(expectedL1, lm.getLevel(1)); + } + + @Test + public void randomMultiLevelAddTest() + { + int iterations = 100; + int levelCount = 8; + + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions()); + long seed = System.currentTimeMillis(); + Random r = new Random(seed); + List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, levelCount, 0, r); + + int sstableCount = newLevels.size(); + lm.addSSTables(newLevels); + + int [] expectedLevelSizes = lm.getAllLevelSize(); + + for (int j = 0; j < iterations; j++) + { + newLevels = generateNewRandomLevels(cfs, 20, levelCount, sstableCount, r); + sstableCount += newLevels.size(); + + int[] canAdd = canAdd(lm, newLevels, levelCount); + for (int i = 0; i < levelCount; i++) + expectedLevelSizes[i] += canAdd[i]; + lm.addSSTables(newLevels); + } + + // and verify no levels overlap + int actualSSTableCount = 0; + for (int i = 0; i < levelCount; i++) + { + actualSSTableCount += lm.getLevelSize(i); + List<SSTableReader> level = new ArrayList<>(lm.getLevel(i)); + int lvl = i; + assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == lvl)); + if (i > 0) + { + level.sort(SSTableReader.sstableComparator); + SSTableReader prev = null; + for (SSTableReader sstable : level) + { + if (prev != null && sstable.first.compareTo(prev.last) <= 0) + { + String levelStr = level.stream().map(s -> String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", ")); + String overlap = String.format("sstable [%s, %s] overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, prev.first, prev.last, i, levelStr); + Assert.fail("[seed = "+seed+"] overlap in level "+lvl+": " + overlap); + } + prev = sstable; + } + } + } + assertEquals(sstableCount, actualSSTableCount); + for (int i = 0; i < levelCount; i++) + assertEquals("[seed = " + seed + "] wrong sstable count in level = " + i, expectedLevelSizes[i], lm.getLevel(i).size()); + } + + private static List<SSTableReader> generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int levelCount, int startGen, Random r) + { + List<SSTableReader> newLevels = new ArrayList<>(); + for (int level = 0; level < levelCount; level++) + { + int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1; + List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2); + + for (int i = 0; i < numLevelSSTables * 2; i++) + tokens.add(r.nextInt(4000)); + Collections.sort(tokens); + for (int i = 0; i < tokens.size() - 1; i += 2) + { + SSTableReader sstable = MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), level, cfs); + newLevels.add(sstable); + } + } + return newLevels; + } + + /** + * brute-force checks if the new sstables can be added to the correct level in manifest + * + * @return count of expected sstables to add to each level + */ + private static int[] canAdd(LeveledManifest lm, List<SSTableReader> newSSTables, int levelCount) + { + Map<Integer, Collection<SSTableReader>> sstableGroups = new HashMap<>(); + newSSTables.forEach(s -> sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new ArrayList<>()).add(s)); + + int[] canAdd = new int[levelCount]; + for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : sstableGroups.entrySet()) + { + int level = lvlGroup.getKey(); + if (level == 0) + { + canAdd[0] += lvlGroup.getValue().size(); + continue; + } + + List<SSTableReader> newLevel = new ArrayList<>(lm.getLevel(level)); + for (SSTableReader sstable : lvlGroup.getValue()) + { + newLevel.add(sstable); + newLevel.sort(SSTableReader.sstableComparator); + + SSTableReader prev = null; + boolean kept = true; + for (SSTableReader sst : newLevel) + { + if (prev != null && prev.last.compareTo(sst.first) >= 0) + { + newLevel.remove(sstable); + kept = false; + break; + } + prev = sst; + } + if (kept) + canAdd[level] += 1; + else + canAdd[0] += 1; + } + } + return canAdd; + } + + private static void assertLevelsEqual(Collection<SSTableReader> l1, Collection<SSTableReader> l2) + { + assertEquals(l1.size(), l2.size()); + assertEquals(new HashSet<>(l1), new HashSet<>(l2)); + } } diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java index 0000000,1f17bf8..0e20d63 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledGenerationsTest.java @@@ -1,0 -1,199 +1,199 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.compaction; + + import java.util.Collections; + import java.util.Iterator; + import java.util.List; + + import com.google.common.collect.Iterables; + import com.google.common.collect.Lists; + import org.junit.BeforeClass; + import org.junit.Test; + -import org.apache.cassandra.MockSchema; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.db.BufferDecoratedKey; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.DecoratedKey; + import org.apache.cassandra.dht.Murmur3Partitioner; + import org.apache.cassandra.io.sstable.format.SSTableReader; ++import org.apache.cassandra.schema.MockSchema; + import org.apache.cassandra.utils.ByteBufferUtil; + + import static junit.framework.Assert.assertFalse; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.fail; + + public class LeveledGenerationsTest extends CQLTester + { + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.daemonInitialization(); + MockSchema.cleanup(); + } + + @Test + public void testWrappingIterable() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + + LeveledGenerations gens = new LeveledGenerations(); + + for (int i = 0; i < 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i, 5, true, i, i, 2, cfs); + gens.addAll(Collections.singleton(sstable)); + } + int gen = 10; + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 5)), + 6, 5, 10); + assertIter(gens.wrappingIterator(2, null), + 0, 9, 10); + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, -10, 0)), + 1, 0, 10); + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 5, 9)), + 0, 9, 10); + assertIter(gens.wrappingIterator(2, sst(++gen, cfs, 0, 1000)), + 0, 9, 10); + + gens.addAll(Collections.singleton(MockSchema.sstable(100, 5, true, 5, 10, 3, cfs))); + assertIter(gens.wrappingIterator(3, sst(++gen, cfs, -10, 0)), + 5, 5, 1); + assertIter(gens.wrappingIterator(3, sst(++gen, cfs, 0, 100)), + 5, 5, 1); + + gens.addAll(Collections.singleton(MockSchema.sstable(200, 5, true, 5, 10, 4, cfs))); + gens.addAll(Collections.singleton(MockSchema.sstable(201, 5, true, 40, 50, 4, cfs))); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 0)), + 5, 40, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 0, 5)), + 40, 5, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 7, 8)), + 40, 5, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 39, 39)), + 40, 5, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 40, 40)), + 5, 40, 2); + assertIter(gens.wrappingIterator(4, sst(++gen, cfs, 100, 1000)), + 5, 40, 2); + } + + @Test + public void testWrappingIterableWiderSSTables() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledGenerations generations = new LeveledGenerations(); + int gen = 0; + generations.addAll(Lists.newArrayList( + sst(++gen, cfs, 0, 50), + sst(++gen, cfs, 51, 100), + sst(++gen, cfs, 150, 200))); + + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, -100, -50)), + 0, 150, 3); + + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 40)), + 51, 0, 3); + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 50)), + 51, 0, 3); + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 0, 51)), + 150, 51, 3); + + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 149)), + 150, 51, 3); + assertIter(generations.wrappingIterator(2, sst(++gen, cfs, 100, 300)), + 0, 150, 3); + + } + + @Test + public void testEmptyLevel() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LeveledGenerations generations = new LeveledGenerations(); + assertFalse(generations.wrappingIterator(3, sst(0, cfs, 0, 10)).hasNext()); + assertFalse(generations.wrappingIterator(3, null).hasNext()); + } + + @Test + public void testFillLevels() + { + LeveledGenerations generations = new LeveledGenerations(); + + ColumnFamilyStore cfs = MockSchema.newCFS(); + for (int i = 0; i < LeveledGenerations.MAX_LEVEL_COUNT; i++) + generations.addAll(Collections.singleton(MockSchema.sstableWithLevel(i, i, i, i, cfs))); + + for (int i = 0; i < generations.levelCount(); i++) + assertEquals(i, generations.get(i).iterator().next().getSSTableLevel()); + + assertEquals(9, generations.levelCount()); + + try + { + generations.get(9); + fail("don't have 9 generations"); + } + catch (ArrayIndexOutOfBoundsException e) + {} + try + { + generations.get(-1); + fail("don't have -1 generations"); + } + catch (ArrayIndexOutOfBoundsException e) + {} + } + + private void assertIter(Iterator<SSTableReader> iter, long first, long last, int expectedCount) + { + List<SSTableReader> drained = Lists.newArrayList(iter); + assertEquals(expectedCount, drained.size()); + assertEquals(dk(first).getToken(), first(drained).first.getToken()); + assertEquals(dk(last).getToken(), last(drained).first.getToken()); // we sort by first token, so this is the first token of the last sstable in iter + } + + private SSTableReader last(Iterable<SSTableReader> iter) + { + return Iterables.getLast(iter); + } + private SSTableReader first(Iterable<SSTableReader> iter) + { + SSTableReader first = Iterables.getFirst(iter, null); + if (first == null) + throw new RuntimeException(); + return first; + } + + private DecoratedKey dk(long x) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(x), ByteBufferUtil.bytes(x)); + } + private SSTableReader sst(int gen, ColumnFamilyStore cfs, long first, long last) + { + return MockSchema.sstable(gen, 5, true, first, last, 2, cfs); + } + + private void print(SSTableReader sstable) + { + System.out.println(String.format("%d %s %s %d", sstable.descriptor.generation, sstable.first, sstable.last, sstable.getSSTableLevel())); + } + } diff --cc test/unit/org/apache/cassandra/schema/MockSchema.java index 5ce8520,0000000..7d2d874 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/schema/MockSchema.java +++ b/test/unit/org/apache/cassandra/schema/MockSchema.java @@@ -1,222 -1,0 +1,233 @@@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.schema; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; + +public class MockSchema +{ + static + { + Memory offsets = Memory.allocate(4); + offsets.setInt(0, 0); + indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4), 0, 0, 0, 1); + } + private static final AtomicInteger id = new AtomicInteger(); + public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1))); + + public static final IndexSummary indexSummary; + + private static final File tempFile = temp("mocksegmentedfile"); + + public static Memtable memtable(ColumnFamilyStore cfs) + { + return new Memtable(cfs.metadata()); + } + + public static SSTableReader sstable(int generation, ColumnFamilyStore cfs) + { + return sstable(generation, false, cfs); + } + + public static SSTableReader sstable(int generation, long first, long last, ColumnFamilyStore cfs) + { + return sstable(generation, 0, false, first, last, cfs); + } + + public static SSTableReader sstable(int generation, boolean keepRef, ColumnFamilyStore cfs) + { + return sstable(generation, 0, keepRef, cfs); + } + + public static SSTableReader sstable(int generation, int size, ColumnFamilyStore cfs) + { + return sstable(generation, size, false, cfs); + } + public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) + { + return sstable(generation, size, keepRef, generation, generation, cfs); + } + ++ public static SSTableReader sstableWithLevel(int generation, long firstToken, long lastToken, int level, ColumnFamilyStore cfs) ++ { ++ return sstable(generation, 0, false, firstToken, lastToken, level, cfs); ++ } ++ + public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs) + { ++ return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs); ++ } ++ ++ public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs) ++ { + Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), + cfs.keyspace.getName(), + cfs.getTableName(), + generation, SSTableFormat.Type.BIG); + Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); + for (Component component : components) + { + File file = new File(descriptor.filenameFor(component)); + try + { + file.createNewFile(); + } + catch (IOException e) + { + } + } + // .complete() with size to make sstable.onDiskLength work + try (FileHandle.Builder builder = new FileHandle.Builder(new ChannelProxy(tempFile)).bufferSize(size); + FileHandle fileHandle = builder.complete(size)) + { + if (size > 0) + { + try + { + File file = new File(descriptor.filenameFor(Component.DATA)); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList()); + StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator) + .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, null, false, header) + .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, + fileHandle.sharedCopy(), fileHandle.sharedCopy(), indexSummary.sharedCopy(), + new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); + reader.first = readerBounds(firstToken); + reader.last = readerBounds(lastToken); + if (!keepRef) + reader.selfRef().release(); + return reader; + } ++ + } + + public static ColumnFamilyStore newCFS() + { + return newCFS(ks.getName()); + } + + public static ColumnFamilyStore newCFS(String ksname) + { + return newCFS(newTableMetadata(ksname)); + } + + public static ColumnFamilyStore newCFS(Function<TableMetadata.Builder, TableMetadata.Builder> options) + { + return newCFS(ks.getName(), options); + } + + public static ColumnFamilyStore newCFS(String ksname, Function<TableMetadata.Builder, TableMetadata.Builder> options) + { + return newCFS(options.apply(newTableMetadataBuilder(ksname)).build()); + } + + public static ColumnFamilyStore newCFS(TableMetadata metadata) + { + return new ColumnFamilyStore(ks, metadata.name, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false); + } + + public static TableMetadata newTableMetadata(String ksname) + { + return newTableMetadata(ksname, "mockcf" + (id.incrementAndGet())); + } + + public static TableMetadata newTableMetadata(String ksname, String cfname) + { + return newTableMetadataBuilder(ksname, cfname).build(); + } + + public static TableMetadata.Builder newTableMetadataBuilder(String ksname) + { + return newTableMetadataBuilder(ksname, "mockcf" + (id.incrementAndGet())); + } + + public static TableMetadata.Builder newTableMetadataBuilder(String ksname, String cfname) + { + return TableMetadata.builder(ksname, cfname) + .partitioner(Murmur3Partitioner.instance) + .addPartitionKeyColumn("key", UTF8Type.instance) + .addClusteringColumn("col", UTF8Type.instance) + .addRegularColumn("value", UTF8Type.instance) + .caching(CachingParams.CACHE_NOTHING); + } + + public static BufferDecoratedKey readerBounds(long generation) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER); + } + + private static File temp(String id) + { + File file = FileUtils.createTempFile(id, "tmp"); + file.deleteOnExit(); + return file; + } + + public static void cleanup() + { + // clean up data directory which are stored as data directory/keyspace/data files + for (String dirName : DatabaseDescriptor.getAllDataFileLocations()) + { + File dir = new File(dirName); + if (!dir.exists()) + continue; + String[] children = dir.list(); + for (String child : children) + FileUtils.deleteRecursive(new File(dir, child)); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org