Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/702ec088 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/702ec088 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/702ec088 Branch: refs/heads/trunk Commit: 702ec088f5f61106b41e128f3fb8f109da8cbe1c Parents: 01b91cc 3d01e90 Author: Ariel Weisberg <aweisb...@apple.com> Authored: Mon Feb 13 13:31:23 2017 -0500 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Mon Feb 13 13:32:30 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 27 +++++++++++++ .../org/apache/cassandra/config/Config.java | 3 +- .../cassandra/config/DatabaseDescriptor.java | 16 ++++++++ .../cassandra/net/OutboundTcpConnection.java | 4 +- .../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++---- .../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++ 7 files changed, 114 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 67c45e8,b19550a..8164a52 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -215,20 -100,6 +215,21 @@@ Merged from 3.0 * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545) Merged from 2.2: ++ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090) + * Fix speculative retry bugs (CASSANDRA-13009) + * Fix handling of nulls and unsets in IN conditions (CASSANDRA-12981) + * Fix race causing infinite loop if Thrift server is stopped before it starts listening (CASSANDRA-12856) + * CompactionTasks now correctly drops sstables out of compaction when not enough disk space is available (CASSANDRA-12979) + * Remove support for non-JavaScript UDFs (CASSANDRA-12883) + * Fix DynamicEndpointSnitch noop in multi-datacenter situations (CASSANDRA-13074) + * cqlsh copy-from: encode column names to avoid primary key parsing errors (CASSANDRA-12909) + * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616) + * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796) + * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980) + * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935) + * cqlsh: fix DESC TYPES errors (CASSANDRA-12914) + * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899) + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281) * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792) * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/conf/cassandra.yaml ---------------------------------------------------------------------- diff --cc conf/cassandra.yaml index 4436a02,a9d4c01..063a0b7 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@@ -1173,24 -959,29 +1173,51 @@@ gc_warn_threshold_in_ms: 100 # as corrupted. # max_value_size_in_mb: 256 +# Back-pressure settings # +# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation +# sent to replicas, with the aim of reducing pressure on overloaded replicas. +back_pressure_enabled: false +# The back-pressure strategy applied. +# The default implementation, RateBasedBackPressure, takes three arguments: +# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests. +# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor; +# if above high ratio, the rate limiting is increased by the given factor; +# such factor is usually best configured between 1 and 10, use larger values for a faster recovery +# at the expense of potentially more dropped mutations; +# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica, +# if SLOW at the speed of the slowest one. +# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and +# provide a public constructor accepting a Map<String, Object>. +back_pressure_strategy: + - class_name: org.apache.cassandra.net.RateBasedBackPressure + parameters: + - high_ratio: 0.90 + factor: 5 + flow: FAST ++ + # Coalescing Strategies # + # Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more). + # On bare metal, the floor for packet processing throughput is high enough that many applications wonât notice, but in + # virtualized environments, the point at which an application can be bound by network packet processing can be + # surprisingly low compared to the throughput of task processing that is possible inside a VM. Itâs not that bare metal + # doesnât benefit from coalescing messages, itâs that the number of packets a bare metal network interface can process + # is sufficient for many applications such that no load starvation is experienced even without coalescing. + # There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages + # per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one + # trip to read from a socket, and all the task submission work can be done at the same time reducing context switching + # and increasing cache friendliness of network message processing. + # See CASSANDRA-8692 for details. + + # Strategy to use for coalescing messages in OutboundTcpConnection. + # Can be fixed, movingaverage, timehorizon (default), disabled. + # You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name. + # otc_coalescing_strategy: TIMEHORIZON + + # How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first + # message is received before it will be sent with any accompanying messages. For moving average this is the + # maximum amount of time that will be waited as well as the interval at which messages must arrive on average + # for coalescing to be enabled. + # otc_coalescing_window_us: 200 + + # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128. + # otc_coalescing_enough_coalesced_messages: 8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4261674,602214f..ce3adfe --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -626,341 -655,45 +626,347 @@@ public class DatabaseDescripto catch (NumberFormatException e) { throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '" - + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + } + + try + { + // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB) + counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null) + ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50) + : conf.counter_cache_size_in_mb; + + if (counterCacheSizeInMB < 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '" + + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + } + + // if set to empty/"auto" then use 5% of Heap size + indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null) + ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)) + : conf.index_summary_capacity_in_mb; + + if (indexSummaryCapacityInMB < 0) + throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '" + + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false); + + if (conf.index_interval != null) + logger.warn("index_interval has been deprecated and should be removed from cassandra.yaml"); + + if(conf.encryption_options != null) + { + logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); + //operate under the assumption that server_encryption_options is not set in yaml rather than both + conf.server_encryption_options = conf.encryption_options; + } + + if (conf.user_defined_function_fail_timeout < 0) + throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false); + if (conf.user_defined_function_warn_timeout < 0) + throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false); + + if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout) + throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false); + + if (conf.max_mutation_size_in_kb == null) + conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2; + else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb) + throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false); + + // native transport encryption options + if (conf.native_transport_port_ssl != null + && conf.native_transport_port_ssl != conf.native_transport_port + && !conf.client_encryption_options.enabled) + { + throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); + } + + if (conf.max_value_size_in_mb <= 0) + throw new ConfigurationException("max_value_size_in_mb must be positive", false); + + switch (conf.disk_optimization_strategy) + { + case ssd: + diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance); + break; + case spinning: + diskOptimizationStrategy = new SpinningDiskOptimizationStrategy(); + break; + } + + try + { + ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams(); + Class<?> clazz = Class.forName(strategy.class_name); + if (!BackPressureStrategy.class.isAssignableFrom(clazz)) + throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false); + + Constructor<?> ctor = clazz.getConstructor(Map.class); + BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters); + logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy); + backPressureStrategy = instance; + } + catch (ConfigurationException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex); + } ++ ++ if (conf.otc_coalescing_enough_coalesced_messages > 128) ++ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false); ++ ++ if (conf.otc_coalescing_enough_coalesced_messages <= 0) ++ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false); + } + + private static String storagedirFor(String type) + { + return storagedir(type + "_directory") + File.separator + type; + } + + private static String storagedir(String errMsgType) + { + String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null); + if (storagedir == null) + throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false); + return storagedir; + } + + public static void applyAddressConfig() throws ConfigurationException + { + applyAddressConfig(conf); + } + + public static void applyAddressConfig(Config config) throws ConfigurationException + { + listenAddress = null; + rpcAddress = null; + broadcastAddress = null; + broadcastRpcAddress = null; + + /* Local IP, hostname or interface to bind services to */ + if (config.listen_address != null && config.listen_interface != null) + { + throw new ConfigurationException("Set listen_address OR listen_interface, not both", false); + } + else if (config.listen_address != null) + { + try + { + listenAddress = InetAddress.getByName(config.listen_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); + } + + if (listenAddress.isAnyLocalAddress()) + throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false); + } + else if (config.listen_interface != null) + { + listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6); + } + + /* Gossip Address to broadcast */ + if (config.broadcast_address != null) + { + try + { + broadcastAddress = InetAddress.getByName(config.broadcast_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); + } + + if (broadcastAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false); + } + + /* Local IP, hostname or interface to bind RPC server to */ + if (config.rpc_address != null && config.rpc_interface != null) + { + throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false); + } + else if (config.rpc_address != null) + { + try + { + rpcAddress = InetAddress.getByName(config.rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false); + } + } + else if (config.rpc_interface != null) + { + rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6); + } + else + { + rpcAddress = FBUtilities.getLocalAddress(); + } + + /* RPC address to broadcast */ + if (config.broadcast_rpc_address != null) + { + try + { + broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); + } + + if (broadcastRpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false); + } + else + { + if (rpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " + + "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false); + } + } + + public static void applyThriftHSHA() + { + // fail early instead of OOMing (see CASSANDRA-8116) + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE) + throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " + + "setting of 'unlimited'. Please see the comments in cassandra.yaml " + + "for rpc_server_type and rpc_max_threads.", + false); + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024)) + logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads); + } + + public static void applyEncryptionContext() + { + // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption, + // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read) + encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options); + } + + public static void applySeedProvider() + { + // load the seeds for node contact points + if (conf.seed_provider == null) + { + throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false); + } + try + { + Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name); + seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters); + } + // there are about 5 checked exceptions that could be thrown here. + catch (Exception e) + { + throw new ConfigurationException(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.", true); + } + if (seedProvider.getSeeds().size() == 0) + throw new ConfigurationException("The seed provider lists no seeds.", false); + } + + public static void applyInitialTokens() + { + if (conf.initial_token != null) + { + Collection<String> tokens = tokensFromString(conf.initial_token); + if (tokens.size() != conf.num_tokens) + throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false); + + for (String token : tokens) + partitioner.getTokenFactory().validate(token); + } + } + + // Maybe safe for clients + tools + public static void applyRequestScheduler() + { + /* Request Scheduler setup */ + requestSchedulerOptions = conf.request_scheduler_options; + if (conf.request_scheduler != null) + { + try + { + if (requestSchedulerOptions == null) + { + requestSchedulerOptions = new RequestSchedulerOptions(); + } + Class<?> cls = Class.forName(conf.request_scheduler); + requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); + } + catch (ClassNotFoundException e) + { + throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false); + } + catch (Exception e) + { + throw new ConfigurationException("Unable to instantiate request scheduler", e); + } + } + else + { + requestScheduler = new NoScheduler(); } - try + if (conf.request_scheduler_id == RequestSchedulerId.keyspace) { - // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB) - counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null) - ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50) - : conf.counter_cache_size_in_mb; - - if (counterCacheSizeInMB < 0) - throw new NumberFormatException(); // to escape duplicating error message + requestSchedulerId = conf.request_scheduler_id; } - catch (NumberFormatException e) + else { - throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '" - + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + // Default to Keyspace + requestSchedulerId = RequestSchedulerId.keyspace; } + } - // if set to empty/"auto" then use 5% of Heap size - indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null) - ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)) - : conf.index_summary_capacity_in_mb; - - if (indexSummaryCapacityInMB < 0) - throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '" - + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false); - - if(conf.encryption_options != null) + // definitely not safe for tools + clients - implicitly instantiates StorageService + public static void applySnitch() + { + /* end point snitch */ + if (conf.endpoint_snitch == null) { - logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); - //operate under the assumption that server_encryption_options is not set in yaml rather than both - conf.server_encryption_options = conf.encryption_options; + throw new ConfigurationException("Missing endpoint_snitch directive", false); } + snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); + EndpointSnitchInfo.create(); - // load the seeds for node contact points - if (conf.seed_provider == null) + localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); + localComparator = new Comparator<InetAddress>() { - throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false); + public int compare(InetAddress endpoint1, InetAddress endpoint2) + { + boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); + boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; + } + }; + } + + // definitely not safe for tools + clients - implicitly instantiates schema + public static void applyPartitioner() + { + /* Hashing strategy */ + if (conf.partitioner == null) + { + throw new ConfigurationException("Missing directive: partitioner", false); } try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/utils/CoalescingStrategies.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/CoalescingStrategies.java index 1a3c13d,d79fa15..9f3b118 --- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java +++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java @@@ -17,10 -17,12 +17,13 @@@ */ package org.apache.cassandra.utils; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.FileUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; import java.io.File; import java.io.RandomAccessFile; http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java index 97d15fe,26b6b3a..b10d70b --- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java +++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java @@@ -101,6 -103,12 +103,12 @@@ public class CoalescingStrategiesTes Semaphore queueParked = new Semaphore(0); Semaphore queueRelease = new Semaphore(0); + @BeforeClass + public static void initDD() + { - DatabaseDescriptor.forceStaticInitialization(); ++ DatabaseDescriptor.daemonInitialization(); + } + @SuppressWarnings({ "serial" }) @Before public void setUp() throws Exception