This is an automated email from the ASF dual-hosted git repository. edimitrova pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 5bb4bab12f8edfef95ed13cbabf8c0f377986065 Author: Ekaterina Dimitrova <ekaterina.dimitr...@datastax.com> AuthorDate: Mon Jan 31 21:51:49 2022 -0500 DataRate parameters transition to the new framework Fix the DB descriptorRefTest which failed on the previous commit patch by Ekaterina Dimitrova; reviewed by Caleb Rackliffe, David Capwell, Michael Semb Wever and Benjamin Lerer for CASSANDRA-15234 --- .build/build-rat.xml | 2 +- conf/cassandra.yaml | 24 +++--- src/java/org/apache/cassandra/config/Config.java | 13 +-- .../org/apache/cassandra/config/Converters.java | 4 +- .../org/apache/cassandra/config/DataRateSpec.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 57 ++++++++++---- .../cassandra/db/compaction/CompactionManager.java | 10 +-- .../apache/cassandra/service/StorageService.java | 61 ++++++++++---- .../cassandra/service/StorageServiceMBean.java | 17 +++- .../apache/cassandra/streaming/StreamManager.java | 36 ++++----- .../streaming/StreamingDataOutputPlus.java | 2 +- .../org/apache/cassandra/tools/LoaderOptions.java | 12 +-- src/java/org/apache/cassandra/tools/NodeProbe.java | 16 ++-- .../tools/nodetool/GetCompactionThroughput.java | 4 +- .../tools/nodetool/GetInterDCStreamThroughput.java | 6 +- .../tools/nodetool/GetStreamThroughput.java | 6 +- .../tools/nodetool/SetCompactionThroughput.java | 4 +- .../tools/nodetool/SetInterDCStreamThroughput.java | 4 +- .../tools/nodetool/SetStreamThroughput.java | 4 +- test/conf/cassandra-murmur.yaml | 2 +- ...ed_parameters_names.yaml => cassandra-old.yaml} | 2 +- test/conf/cassandra-seeds.yaml | 2 +- ...dra-sslcontextfactory-invalidconfiguration.yaml | 4 +- test/conf/cassandra-sslcontextfactory.yaml | 4 +- test/conf/cassandra.yaml | 4 +- test/conf/unit-test-conf/test-native-port.yaml | 2 +- .../test/AbstractNetstatsBootstrapStreaming.java | 8 +- ...WithEntireSSTablesCompressionStreamingTest.java | 2 +- .../test/NetstatsRepairStreamingTest.java | 4 +- .../cassandra/streaming/LongStreamingTest.java | 20 ++--- .../microbench/ZeroCopyStreamingBenchmark.java | 2 +- .../config/DatabaseDescriptorRefTest.java | 14 +++- .../LoadOldYAMLBackwardCompatibilityTest.java | 89 ++++++++++++++++++++- .../miscellaneous/CrcCheckChanceTest.java | 2 +- .../net/AsyncStreamingOutputPlusTest.java | 8 +- .../cassandra/streaming/StreamManagerTest.java | 92 ++++++++++------------ .../cassandra/streaming/StreamRateLimiterTest.java | 32 ++++---- ...st.java => SetGetCompactionThroughputTest.java} | 41 ++++------ ...etEntireSSTableInterDCStreamThroughputTest.java | 12 +-- .../SetGetEntireSSTableStreamThroughputTest.java | 12 +-- .../SetGetInterDCStreamThroughputTest.java | 26 +++--- .../tools/nodetool/SetGetStreamThroughputTest.java | 26 +++--- 42 files changed, 422 insertions(+), 272 deletions(-) diff --git a/.build/build-rat.xml b/.build/build-rat.xml index a1a17cd..599d5ea 100644 --- a/.build/build-rat.xml +++ b/.build/build-rat.xml @@ -55,7 +55,7 @@ <exclude name="**/cassandra-seeds.yaml"/> <exclude NAME="**/doc/antora.yml"/> <exclude name="**/test/conf/cassandra.yaml"/> - <exclude name="**/test/conf/cassandra_deprecated_parameters_names.yaml"/> + <exclude name="**/test/conf/cassandra-old.yaml"/> <exclude name="**/test/conf/cassandra_encryption.yaml"/> <exclude name="**/test/conf/cdc.yaml"/> <exclude name="**/test/conf/commitlog_compression_LZ4.yaml"/> diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index a18a5b0..8741b9b 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -896,7 +896,7 @@ column_index_cache_size_in_kb: 2 # during a single long running compactions. The default is usually # fine and if you experience problems with compaction running too # slowly or too fast, you should look at -# compaction_throughput_mb_per_sec first. +# compaction_throughput first. # # concurrent_compactors defaults to the smaller of (number of disks, # number of cores), with a minimum of 2 and a maximum of 8. @@ -924,7 +924,7 @@ concurrent_materialized_view_builders: 1 # Setting this to 0 disables throttling. Note that this accounts for all types # of compaction, including validation compaction (building Merkle trees # for repairs). -compaction_throughput_mb_per_sec: 64 +compaction_throughput: 64MiB/s # When compacting, the replacement sstable(s) can be opened before they # are completely written, and used in place of the prior sstables for @@ -935,8 +935,8 @@ sstable_preemptive_open_interval_in_mb: 50 # When enabled, permits Cassandra to zero-copy stream entire eligible # SSTables between nodes, including every component. # This speeds up the network transfer significantly subject to -# throttling specified by entire_sstable_stream_throughput_outbound_megabits_per_sec, -# and entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec +# throttling specified by entire_sstable_stream_throughput_outbound, +# and entire_sstable_inter_dc_stream_throughput_outbound # for inter-DC transfers. # Enabling this will reduce the GC pressure on sending and receiving node. # When unset, the default is enabled. While this feature tries to keep the @@ -947,27 +947,27 @@ sstable_preemptive_open_interval_in_mb: 50 # Throttles entire SSTable outbound streaming file transfers on # this node to the given total throughput in Mbps. # Setting this value to 0 it disables throttling. -# When unset, the default is 200 Mbps or 25 MB/s. -# entire_sstable_stream_throughput_outbound_megabits_per_sec: 200 +# When unset, the default is 200 Mbps or 24 MiB/s. +# entire_sstable_stream_throughput_outbound: 24MiB/s # Throttles entire SSTable file streaming between datacenters. # Setting this value to 0 disables throttling for entire SSTable inter-DC file streaming. -# When unset, the default is 200 Mbps or 25 MB/s. -# entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec: 200 +# When unset, the default is 200 Mbps or 24 MiB/s. +# entire_sstable_inter_dc_stream_throughput_outbound: 24MiB/s # Throttles all outbound streaming file transfers on this node to the # given total throughput in Mbps. This is necessary because Cassandra does # mostly sequential IO when streaming data during bootstrap or repair, which # can lead to saturating the network connection and degrading rpc performance. -# When unset, the default is 200 Mbps or 25 MB/s. -# stream_throughput_outbound_megabits_per_sec: 200 +# When unset, the default is 200 Mbps or 24 MiB/s. +# stream_throughput_outbound: 24MiB/s # Throttles all streaming file transfer between the datacenters, # this setting allows users to throttle inter dc stream throughput in addition # to throttling all network stream traffic as configured with # stream_throughput_outbound_megabits_per_sec -# When unset, the default is 200 Mbps or 25 MB/s. -# inter_dc_stream_throughput_outbound_megabits_per_sec: 200 +# When unset, the default is 200 Mbps or 24 MiB/s. +# inter_dc_stream_throughput_outbound: 24MiB/s # Server side timeouts for requests. The server will return a timeout exception # to the client if it can't complete an operation within the corresponding diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 25f015a..4f6e27e 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -229,7 +229,8 @@ public class Config public volatile int batch_size_fail_threshold_in_kb = 50; public Integer unlogged_batch_across_partitions_warn_threshold = 10; public volatile Integer concurrent_compactors; - public volatile int compaction_throughput_mb_per_sec = 16; + @Replaces(oldName = "compaction_throughput_mb_per_sec", converter = Converters.MEBIBYTES_PER_SECOND_DATA_RATE, deprecated = true) + public volatile DataRateSpec compaction_throughput = new DataRateSpec("16MiB/s"); public volatile int compaction_large_partition_warning_threshold_mb = 100; public int min_free_space_per_drive_in_mb = 50; public volatile Integer compaction_tombstone_warning_threshold = 100000; @@ -243,11 +244,13 @@ public class Config @Deprecated public int max_streaming_retries = 3; - public volatile int stream_throughput_outbound_megabits_per_sec = 200; - public volatile int inter_dc_stream_throughput_outbound_megabits_per_sec = 200; + @Replaces(oldName = "stream_throughput_outbound_megabits_per_sec", converter = Converters.MEGABITS_TO_MEBIBYTES_PER_SECOND_DATA_RATE, deprecated = true) + public volatile DataRateSpec stream_throughput_outbound = new DataRateSpec("24MiB/s"); + @Replaces(oldName = "inter_dc_stream_throughput_outbound_megabits_per_sec", converter = Converters.MEGABITS_TO_MEBIBYTES_PER_SECOND_DATA_RATE, deprecated = true) + public volatile DataRateSpec inter_dc_stream_throughput_outbound = new DataRateSpec("24MiB/s"); - public volatile int entire_sstable_stream_throughput_outbound_megabits_per_sec = 200; - public volatile int entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = 200; + public volatile DataRateSpec entire_sstable_stream_throughput_outbound = new DataRateSpec("24MiB/s"); + public volatile DataRateSpec entire_sstable_inter_dc_stream_throughput_outbound = new DataRateSpec("24MiB/s"); public String[] data_file_directories = new String[0]; diff --git a/src/java/org/apache/cassandra/config/Converters.java b/src/java/org/apache/cassandra/config/Converters.java index 9a832cc..629e3d3 100644 --- a/src/java/org/apache/cassandra/config/Converters.java +++ b/src/java/org/apache/cassandra/config/Converters.java @@ -65,14 +65,14 @@ public enum Converters o -> ((DataStorageSpec)o).toBytes()), MEBIBYTES_PER_SECOND_DATA_RATE(Long.class, o -> DataRateSpec.inMebibytesPerSecond((Long) o), - o -> ((DataRateSpec)o).toMebibytesPerSecond()), + o -> ((DataRateSpec)o).toMebibytesPerSecondAsInt()), /** * This converter is a custom one to support backward compatibility for stream_throughput_outbound and * inter_dc_stream_throughput_outbound which were provided in megatibs per second prior CASSANDRA-15234. */ MEGABITS_TO_MEBIBYTES_PER_SECOND_DATA_RATE(Long.class, o -> DataRateSpec.megabitsPerSecondInMebibytesPerSecond((Long)o), - o -> ((DataRateSpec)o).toMegabitsPerSecond()); + o -> ((DataRateSpec)o).toMegabitsPerSecondAsInt()); private final Class<?> inputType; private final Function<Object, Object> convert; diff --git a/src/java/org/apache/cassandra/config/DataRateSpec.java b/src/java/org/apache/cassandra/config/DataRateSpec.java index 3512513..bbdbecc 100644 --- a/src/java/org/apache/cassandra/config/DataRateSpec.java +++ b/src/java/org/apache/cassandra/config/DataRateSpec.java @@ -300,6 +300,8 @@ public final class DataRateSpec public double toMegabitsPerSecond(double d) { + + if (d > MAX / (MEGABITS_PER_MEBIBYTE)) return MAX; return Math.round(d * MEGABITS_PER_MEBIBYTE); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 530482e..021b0c6 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1841,14 +1841,19 @@ public class DatabaseDescriptor conf.concurrent_compactors = value; } - public static int getCompactionThroughputMbPerSec() + public static int getCompactionThroughputMebibytesPerSecAsInt() { - return conf.compaction_throughput_mb_per_sec; + return conf.compaction_throughput.toMebibytesPerSecondAsInt(); } - public static void setCompactionThroughputMbPerSec(int value) + public static double getCompactionThroughputMebibytesPerSec() { - conf.compaction_throughput_mb_per_sec = value; + return conf.compaction_throughput.toMebibytesPerSecond(); + } + + public static void setCompactionThroughputMebibytesPerSec(int value) + { + conf.compaction_throughput = DataRateSpec.inMebibytesPerSecond(value); } public static long getCompactionLargePartitionWarningThreshold() { return ByteUnit.MEBI_BYTES.toBytes(conf.compaction_large_partition_warning_threshold_mb); } @@ -1901,42 +1906,62 @@ public class DatabaseDescriptor public static int getStreamThroughputOutboundMegabitsPerSec() { - return conf.stream_throughput_outbound_megabits_per_sec; + return conf.stream_throughput_outbound.toMegabitsPerSecondAsInt(); + } + + public static double getStreamThroughputOutboundMebibytesPerSec() + { + return conf.stream_throughput_outbound.toMebibytesPerSecond(); } public static void setStreamThroughputOutboundMegabitsPerSec(int value) { - conf.stream_throughput_outbound_megabits_per_sec = value; + conf.stream_throughput_outbound = DataRateSpec.megabitsPerSecondInMebibytesPerSecond(value); } - public static int getEntireSSTableStreamThroughputOutboundMegabitsPerSec() + public static int getEntireSSTableStreamThroughputOutboundMebibytesPerSecAsInt() { - return conf.entire_sstable_stream_throughput_outbound_megabits_per_sec; + return conf.entire_sstable_stream_throughput_outbound.toMebibytesPerSecondAsInt(); } - public static void setEntireSSTableStreamThroughputOutboundMegabitsPerSec(int value) + public static double getEntireSSTableStreamThroughputOutboundMebibytesPerSec() { - conf.entire_sstable_stream_throughput_outbound_megabits_per_sec = value; + return conf.entire_sstable_stream_throughput_outbound.toMebibytesPerSecond(); + } + + public static void setEntireSSTableStreamThroughputOutboundMebibytesPerSec(int value) + { + conf.entire_sstable_stream_throughput_outbound = DataRateSpec.inMebibytesPerSecond(value); } public static int getInterDCStreamThroughputOutboundMegabitsPerSec() { - return conf.inter_dc_stream_throughput_outbound_megabits_per_sec; + return conf.inter_dc_stream_throughput_outbound.toMegabitsPerSecondAsInt(); + } + + public static double getInterDCStreamThroughputOutboundMebibytesPerSec() + { + return conf.inter_dc_stream_throughput_outbound.toMebibytesPerSecond(); } public static void setInterDCStreamThroughputOutboundMegabitsPerSec(int value) { - conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value; + conf.inter_dc_stream_throughput_outbound = DataRateSpec.megabitsPerSecondInMebibytesPerSecond(value); + } + + public static double getEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec() + { + return conf.entire_sstable_inter_dc_stream_throughput_outbound.toMebibytesPerSecond(); } - public static int getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec() + public static int getEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSecAsInt() { - return conf.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec; + return conf.entire_sstable_inter_dc_stream_throughput_outbound.toMebibytesPerSecondAsInt(); } - public static void setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(int value) + public static void setEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec(int value) { - conf.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = value; + conf.entire_sstable_inter_dc_stream_throughput_outbound = DataRateSpec.inMebibytesPerSecond(value); } /** diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index f1e8af5..6b82321 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -151,18 +151,18 @@ public class CompactionManager implements CompactionManagerMBean */ public RateLimiter getRateLimiter() { - setRate(DatabaseDescriptor.getCompactionThroughputMbPerSec()); + setRate(DatabaseDescriptor.getCompactionThroughputMebibytesPerSec()); return compactionRateLimiter; } /** - * Sets the rate for the rate limiter. When compaction_throughput_mb_per_sec is 0 or node is bootstrapping, + * Sets the rate for the rate limiter. When compaction_throughput is 0 or node is bootstrapping, * this sets the rate to Double.MAX_VALUE bytes per second. - * @param throughPutMbPerSec throughput to set in mb per second + * @param throughPutMiBPerSec throughput to set in MiB/s */ - public void setRate(final double throughPutMbPerSec) + public void setRate(final double throughPutMiBPerSec) { - double throughput = throughPutMbPerSec * 1024.0 * 1024.0; + double throughput = throughPutMiBPerSec * 1024.0 * 1024.0; // if throughput is set to 0, throttling is disabled if (throughput == 0 || StorageService.instance.isBootstrapMode()) throughput = Double.MAX_VALUE; diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 4c0562a..9d6a295 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1497,67 +1497,96 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return DatabaseDescriptor.getTruncateRpcTimeout(MILLISECONDS); } + @Deprecated public void setStreamThroughputMbPerSec(int value) { + setStreamThroughputMbitPerSec(value); + } + + public void setStreamThroughputMbitPerSec(int value) + { int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value); StreamManager.StreamRateLimiter.updateThroughput(); - logger.info("setstreamthroughput: throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue); + logger.info("setstreamthroughput: throttle set to {}{} megabits per second (was {} megabits per second)", + value, value <= 0 ? " (unlimited)" : "", oldValue); } + @Deprecated public int getStreamThroughputMbPerSec() { + return getStreamThroughputMbitPerSec(); + } + + public int getStreamThroughputMbitPerSec() + { return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(); } - public void setEntireSSTableStreamThroughputMbPerSec(int value) + public void setEntireSSTableStreamThroughputMebibytesPerSec(int value) { - int oldValue = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec(); - DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(value); + int oldValue = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMebibytesPerSecAsInt(); + DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMebibytesPerSec(value); StreamManager.StreamRateLimiter.updateEntireSSTableThroughput(); - logger.info("setstreamthroughput (entire SSTable): throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue); + logger.info("setstreamthroughput (entire SSTable): throttle set to {}{} MiB/s (was {} MiB/s)", + value, value <= 0 ? " (unlimited)" : "", oldValue); } - public int getEntireSSTableStreamThroughputMbPerSec() + public int getEntireSSTableStreamThroughputMebibytesPerSec() { - return DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec(); + return DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMebibytesPerSecAsInt(); } + @Deprecated public void setInterDCStreamThroughputMbPerSec(int value) { + setInterDCStreamThroughputMbitPerSec(value); + } + + public void setInterDCStreamThroughputMbitPerSec(int value) + { int oldValue = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec(); DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(value); StreamManager.StreamRateLimiter.updateInterDCThroughput(); - logger.info("setinterdcstreamthroughput: throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue); + logger.info("setinterdcstreamthroughput: throttle set to {}{} megabits per second (was {} megabits per second)", value, value <= 0 ? " (unlimited)" : "", oldValue); } + @Deprecated public int getInterDCStreamThroughputMbPerSec() { + return getInterDCStreamThroughputMbitPerSec(); + } + + public int getInterDCStreamThroughputMbitPerSec() + { return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec(); } - public void setEntireSSTableInterDCStreamThroughputMbPerSec(int value) + public void setEntireSSTableInterDCStreamThroughputMebibytesPerSec(int value) { - int oldValue = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(); - DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(value); + int oldValue = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSecAsInt(); + DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec(value); StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput(); - logger.info("setinterdcstreamthroughput (entire SSTable): throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue); + logger.info("setinterdcstreamthroughput (entire SSTable): throttle set to {}{} MiB/s (was {} MiB/s)", value, value <= 0 ? " (unlimited)" : "", oldValue); } - public int getEntireSSTableInterDCStreamThroughputMbPerSec() + public int getEntireSSTableInterDCStreamThroughputMebibytesPerSec() { - return DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(); + return DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSecAsInt(); } public int getCompactionThroughputMbPerSec() { - return DatabaseDescriptor.getCompactionThroughputMbPerSec(); + return DatabaseDescriptor.getCompactionThroughputMebibytesPerSecAsInt(); } public void setCompactionThroughputMbPerSec(int value) { - DatabaseDescriptor.setCompactionThroughputMbPerSec(value); + int oldValue = DatabaseDescriptor.getCompactionThroughputMebibytesPerSecAsInt(); + DatabaseDescriptor.setCompactionThroughputMebibytesPerSec(value); CompactionManager.instance.setRate(value); + logger.info("compactionthroughput: throttle set to {} mebibytes per second (was {} mebibytes per second)", + value, oldValue); } public int getBatchlogReplayThrottleInKB() diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 0f3ac51..8e386ec 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -618,17 +618,26 @@ public interface StorageServiceMBean extends NotificationEmitter public void setTruncateRpcTimeout(long value); public long getTruncateRpcTimeout(); + public void setStreamThroughputMbitPerSec(int value); + public int getStreamThroughputMbitPerSec(); + + @Deprecated public void setStreamThroughputMbPerSec(int value); + @Deprecated public int getStreamThroughputMbPerSec(); + public void setInterDCStreamThroughputMbitPerSec(int value); + public int getInterDCStreamThroughputMbitPerSec(); + @Deprecated public void setInterDCStreamThroughputMbPerSec(int value); + @Deprecated public int getInterDCStreamThroughputMbPerSec(); - public void setEntireSSTableStreamThroughputMbPerSec(int value); - public int getEntireSSTableStreamThroughputMbPerSec(); + public void setEntireSSTableStreamThroughputMebibytesPerSec(int value); + public int getEntireSSTableStreamThroughputMebibytesPerSec(); - public void setEntireSSTableInterDCStreamThroughputMbPerSec(int value); - public int getEntireSSTableInterDCStreamThroughputMbPerSec(); + public void setEntireSSTableInterDCStreamThroughputMebibytesPerSec(int value); + public int getEntireSSTableInterDCStreamThroughputMebibytesPerSec(); public int getCompactionThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index 59f8821..bec112b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -41,7 +41,7 @@ import org.apache.cassandra.streaming.management.StreamStateCompositeData; /** * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked. * - * All stream operation should be created through this class to track streaming status and progress. + * All stream operations should be created through this class to track streaming status and progress. */ public class StreamManager implements StreamManagerMBean { @@ -49,7 +49,7 @@ public class StreamManager implements StreamManagerMBean /** * Gets streaming rate limiter. - * When stream_throughput_outbound_megabits_per_sec is 0, this returns rate limiter + * When stream_throughput_outbound is 0, this returns rate limiter * with the rate of Double.MAX_VALUE bytes per second. * Rate unit is bytes per sec. * @@ -60,13 +60,13 @@ public class StreamManager implements StreamManagerMBean return new StreamRateLimiter(peer, StreamRateLimiter.LIMITER, StreamRateLimiter.INTER_DC_LIMITER, - DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(), - DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()); + DatabaseDescriptor.getStreamThroughputOutboundMebibytesPerSec(), + DatabaseDescriptor.getInterDCStreamThroughputOutboundMebibytesPerSec()); } /** * Get streaming rate limiter for entire SSTable operations. - * When {@code entire_sstable_stream_throughput_outbound_megabits_per_sec} + * When {@code entire_sstable_stream_throughput_outbound} * is less than or equal ot {@code 0}, this returns rate limiter with the * rate of {@link Double.MAX_VALUE} bytes per second. * Rate unit is bytes per sec. @@ -79,13 +79,13 @@ public class StreamManager implements StreamManagerMBean return new StreamRateLimiter(peer, StreamRateLimiter.ENTIRE_SSTABLE_LIMITER, StreamRateLimiter.ENTIRE_SSTABLE_INTER_DC_LIMITER, - DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec(), - DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec()); + DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMebibytesPerSec(), + DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec()); } public static class StreamRateLimiter implements StreamingDataOutputPlus.RateLimiter { - public static final double BYTES_PER_MEGABIT = (1000 * 1000) / 8.0; + public static final double BYTES_PER_MEBIBYTE = 1024.0 * 1024.0; private static final RateLimiter LIMITER = RateLimiter.create(calculateRateInBytes()); private static final RateLimiter INTER_DC_LIMITER = RateLimiter.create(calculateInterDCRateInBytes()); private static final RateLimiter ENTIRE_SSTABLE_LIMITER = RateLimiter.create(calculateEntireSSTableRateInBytes()); @@ -94,10 +94,10 @@ public class StreamManager implements StreamManagerMBean private final RateLimiter limiter; private final RateLimiter interDCLimiter; private final boolean isLocalDC; - private final int throughput; - private final int interDCThroughput; + private final double throughput; + private final double interDCThroughput; - private StreamRateLimiter(InetAddressAndPort peer, RateLimiter limiter, RateLimiter interDCLimiter, int throughput, int interDCThroughput) + private StreamRateLimiter(InetAddressAndPort peer, RateLimiter limiter, RateLimiter interDCLimiter, double throughput, double interDCThroughput) { this.limiter = limiter; this.interDCLimiter = interDCLimiter; @@ -148,25 +148,25 @@ public class StreamManager implements StreamManagerMBean private static double calculateRateInBytes() { - int throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(); + double throughput = DatabaseDescriptor.getStreamThroughputOutboundMebibytesPerSec(); return calculateEffectiveRateInBytes(throughput); } private static double calculateInterDCRateInBytes() { - int throughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec(); + double throughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMebibytesPerSec(); return calculateEffectiveRateInBytes(throughput); } private static double calculateEntireSSTableRateInBytes() { - int throughput = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec(); + double throughput = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMebibytesPerSec(); return calculateEffectiveRateInBytes(throughput); } private static double calculateEntireSSTableInterDCRateInBytes() { - int throughput = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(); + double throughput = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec(); return calculateEffectiveRateInBytes(throughput); } @@ -194,11 +194,11 @@ public class StreamManager implements StreamManagerMBean return ENTIRE_SSTABLE_INTER_DC_LIMITER.getRate(); } - private static double calculateEffectiveRateInBytes(int throughput) + private static double calculateEffectiveRateInBytes(double throughput) { - // if throughput is set to 0 or negative value, throttling is disabled + // if throughput is set to 0, throttling is disabled return throughput > 0 - ? throughput * BYTES_PER_MEGABIT + ? throughput * BYTES_PER_MEBIBYTE : Double.MAX_VALUE; } } diff --git a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java index ab147c6..d845497 100644 --- a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java +++ b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java @@ -74,7 +74,7 @@ public interface StreamingDataOutputPlus extends DataOutputPlus, Closeable /** * Writes all data in file channel to stream: <br> * * For zero-copy-streaming, 1MiB at a time, with at most 2MiB in flight at once. <br> - * * For streaming with SSL, 64kb at a time, with at most 32+64kb (default low water mark + batch size) in flight. <br> + * * For streaming with SSL, 64KiB at a time, with at most 32+64KiB (default low water mark + batch size) in flight. <br> * <p> * This method takes ownership of the provided {@link FileChannel}. * <p> diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java index 5334eab..bd680b6 100644 --- a/src/java/org/apache/cassandra/tools/LoaderOptions.java +++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java @@ -402,10 +402,10 @@ public class LoaderOptions { config = new Config(); // unthrottle stream by default - config.stream_throughput_outbound_megabits_per_sec = 0; - config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0; - config.entire_sstable_stream_throughput_outbound_megabits_per_sec = 0; - config.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = 0; + config.stream_throughput_outbound = DataRateSpec.inMebibytesPerSecond(0); + config.inter_dc_stream_throughput_outbound = DataRateSpec.inMebibytesPerSecond(0); + config.entire_sstable_stream_throughput_outbound = DataRateSpec.inMebibytesPerSecond(0); + config.entire_sstable_inter_dc_stream_throughput_outbound = DataRateSpec.inMebibytesPerSecond(0); } @@ -460,7 +460,7 @@ public class LoaderOptions sslStoragePort = Integer.parseInt(cmd.getOptionValue(SSL_STORAGE_PORT_OPTION)); else sslStoragePort = config.ssl_storage_port; - throttle = config.stream_throughput_outbound_megabits_per_sec; + throttle = config.stream_throughput_outbound.toMebibytesPerSecondAsInt(); // Copy the encryption options and apply the config so that argument parsing can accesss isEnabled. clientEncOptions = config.client_encryption_options.applyConfig(); serverEncOptions = config.server_encryption_options; @@ -675,7 +675,7 @@ public class LoaderOptions "you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/."; String footer = System.lineSeparator() + "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " + - "Only stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. " + + "Only stream_throughput_outbound, server_encryption_options and client_encryption_options are read from yaml. " + "You can override options read from cassandra.yaml with corresponding command line options."; new HelpFormatter().printHelp(usage, header, options, footer); } diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 73d2cbf..90f8bb3 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1318,22 +1318,22 @@ public class NodeProbe implements AutoCloseable public int getStreamThroughput() { - return ssProxy.getStreamThroughputMbPerSec(); + return ssProxy.getStreamThroughputMbitPerSec(); } public int getInterDCStreamThroughput() { - return ssProxy.getInterDCStreamThroughputMbPerSec(); + return ssProxy.getInterDCStreamThroughputMbitPerSec(); } public int getEntireSSTableStreamThroughput() { - return ssProxy.getEntireSSTableStreamThroughputMbPerSec(); + return ssProxy.getEntireSSTableStreamThroughputMebibytesPerSec(); } public int getEntireSSTableInterDCStreamThroughput() { - return ssProxy.getEntireSSTableInterDCStreamThroughputMbPerSec(); + return ssProxy.getEntireSSTableInterDCStreamThroughputMebibytesPerSec(); } public double getTraceProbability() @@ -1431,22 +1431,22 @@ public class NodeProbe implements AutoCloseable public void setStreamThroughput(int value) { - ssProxy.setStreamThroughputMbPerSec(value); + ssProxy.setStreamThroughputMbitPerSec(value); } public void setInterDCStreamThroughput(int value) { - ssProxy.setInterDCStreamThroughputMbPerSec(value); + ssProxy.setInterDCStreamThroughputMbitPerSec(value); } public void setEntireSSTableStreamThroughput(int value) { - ssProxy.setEntireSSTableStreamThroughputMbPerSec(value); + ssProxy.setEntireSSTableStreamThroughputMebibytesPerSec(value); } public void setEntireSSTableInterDCStreamThroughput(int value) { - ssProxy.setEntireSSTableInterDCStreamThroughputMbPerSec(value); + ssProxy.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(value); } public void setTraceProbability(double value) diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java index 839c78d..79ccd2e 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java @@ -22,12 +22,12 @@ import io.airlift.airline.Command; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system") +@Command(name = "getcompactionthroughput", description = "Print the MiB/s throughput cap for compaction in the system") public class GetCompactionThroughput extends NodeToolCmd { @Override public void execute(NodeProbe probe) { - probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s"); + probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s"); } } diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java index 25abb3f..72e5dc1 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java @@ -22,7 +22,7 @@ import io.airlift.airline.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "getinterdcstreamthroughput", description = "Print the Mb/s throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system") +@Command(name = "getinterdcstreamthroughput", description = "Print the throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system") public class GetInterDCStreamThroughput extends NodeToolCmd { @SuppressWarnings("UnusedDeclaration") @@ -36,6 +36,8 @@ public class GetInterDCStreamThroughput extends NodeToolCmd probe.output().out.printf("Current %sinter-datacenter stream throughput: %s%n", entireSSTableThroughput ? "entire SSTable " : "", - throughput > 0 ? throughput + " Mb/s" : "unlimited"); + throughput > 0 ? throughput + + (entireSSTableThroughput ? " MiB/s" : " megabits per second") + : "unlimited"); } } diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java index 4c849a9..b5b8cad 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java @@ -22,7 +22,7 @@ import io.airlift.airline.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "getstreamthroughput", description = "Print the Mb/s throughput cap for streaming and entire SSTable streaming in the system") +@Command(name = "getstreamthroughput", description = "Print the throughput cap for streaming and entire SSTable streaming in the system") public class GetStreamThroughput extends NodeToolCmd { @SuppressWarnings("UnusedDeclaration") @@ -36,6 +36,8 @@ public class GetStreamThroughput extends NodeToolCmd probe.output().out.printf("Current %sstream throughput: %s%n", entireSSTableThroughput ? "entire SSTable " : "", - throughput > 0 ? throughput + " Mb/s" : "unlimited"); + throughput > 0 ? throughput + + (entireSSTableThroughput ? " MiB/s" : " megabits per second") + : "unlimited"); } } diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetCompactionThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetCompactionThroughput.java index 4d01f61..a75aa13 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/SetCompactionThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/SetCompactionThroughput.java @@ -23,10 +23,10 @@ import io.airlift.airline.Command; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "setcompactionthroughput", description = "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling") +@Command(name = "setcompactionthroughput", description = "Set the MiB/s throughput cap for compaction in the system, or 0 to disable throttling") public class SetCompactionThroughput extends NodeToolCmd { - @Arguments(title = "compaction_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true) + @Arguments(title = "compaction_throughput", usage = "<value_in_mb>", description = "Value in MiB, 0 to disable throttling", required = true) private Integer compactionThroughput = null; @Override diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java index f454c1b..6ef167e 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java @@ -23,11 +23,11 @@ import io.airlift.airline.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system, or 0 to disable throttling") +@Command(name = "setinterdcstreamthroughput", description = "Set the throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system, or 0 to disable throttling") public class SetInterDCStreamThroughput extends NodeToolCmd { @SuppressWarnings("UnusedDeclaration") - @Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true) + @Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in megabits, 0 to disable throttling", required = true) private int interDCStreamThroughput; @SuppressWarnings("UnusedDeclaration") diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java index 5c96a07..e618c78 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java @@ -23,11 +23,11 @@ import io.airlift.airline.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming and entire SSTable streaming in the system, or 0 to disable throttling") +@Command(name = "setstreamthroughput", description = "Set throughput cap for streaming and entire SSTable streaming in the system, or 0 to disable throttling") public class SetStreamThroughput extends NodeToolCmd { @SuppressWarnings("UnusedDeclaration") - @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true) + @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in megabits, 0 to disable throttling", required = true) private int streamThroughput; @SuppressWarnings("UnusedDeclaration") diff --git a/test/conf/cassandra-murmur.yaml b/test/conf/cassandra-murmur.yaml index 3c263a5..75c1917 100644 --- a/test/conf/cassandra-murmur.yaml +++ b/test/conf/cassandra-murmur.yaml @@ -35,7 +35,7 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true concurrent_compactors: 4 -compaction_throughput_mb_per_sec: 0 +compaction_throughput: 0MiB/s row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true diff --git a/test/conf/cassandra_deprecated_parameters_names.yaml b/test/conf/cassandra-old.yaml similarity index 98% rename from test/conf/cassandra_deprecated_parameters_names.yaml rename to test/conf/cassandra-old.yaml index 3258b7f..aa5572c 100644 --- a/test/conf/cassandra_deprecated_parameters_names.yaml +++ b/test/conf/cassandra-old.yaml @@ -39,7 +39,7 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true concurrent_compactors: 4 -compaction_throughput_mb_per_sec: 0 +compaction_throughput_mb_per_sec: 64 row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true diff --git a/test/conf/cassandra-seeds.yaml b/test/conf/cassandra-seeds.yaml index f3279ae..2673461 100644 --- a/test/conf/cassandra-seeds.yaml +++ b/test/conf/cassandra-seeds.yaml @@ -36,7 +36,7 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true concurrent_compactors: 4 -compaction_throughput_mb_per_sec: 0 +compaction_throughput: 0MiB/s row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true diff --git a/test/conf/cassandra-sslcontextfactory-invalidconfiguration.yaml b/test/conf/cassandra-sslcontextfactory-invalidconfiguration.yaml index c05bf75..15a4c92 100644 --- a/test/conf/cassandra-sslcontextfactory-invalidconfiguration.yaml +++ b/test/conf/cassandra-sslcontextfactory-invalidconfiguration.yaml @@ -68,7 +68,7 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true concurrent_compactors: 4 -compaction_throughput_mb_per_sec: 0 +compaction_throughput: 0MiB/s row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true @@ -76,7 +76,7 @@ enable_scripted_user_defined_functions: true prepared_statements_cache_size_mb: 1 corrupted_tombstone_strategy: exception stream_entire_sstables: true -stream_throughput_outbound_megabits_per_sec: 200000000 +stream_throughput_outbound: 23841823841858MiB/s enable_sasi_indexes: true enable_materialized_views: true file_cache_enabled: true diff --git a/test/conf/cassandra-sslcontextfactory.yaml b/test/conf/cassandra-sslcontextfactory.yaml index fd17226..3fcb536 100644 --- a/test/conf/cassandra-sslcontextfactory.yaml +++ b/test/conf/cassandra-sslcontextfactory.yaml @@ -71,7 +71,7 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true concurrent_compactors: 4 -compaction_throughput_mb_per_sec: 0 +compaction_throughput: 0MiB/s row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true @@ -79,7 +79,7 @@ enable_scripted_user_defined_functions: true prepared_statements_cache_size_mb: 1 corrupted_tombstone_strategy: exception stream_entire_sstables: true -stream_throughput_outbound_megabits_per_sec: 200000000 +stream_throughput_outbound: 23841858MiB/s enable_sasi_indexes: true enable_materialized_views: true file_cache_enabled: true diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index d556852..3ed919d 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -39,7 +39,7 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true concurrent_compactors: 4 -compaction_throughput_mb_per_sec: 0 +compaction_throughput: 0MiB/s row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true @@ -47,7 +47,7 @@ enable_scripted_user_defined_functions: true prepared_statements_cache_size_mb: 1 corrupted_tombstone_strategy: exception stream_entire_sstables: true -stream_throughput_outbound_megabits_per_sec: 200000000 +stream_throughput_outbound: 23841858MiB/s enable_sasi_indexes: true enable_materialized_views: true enable_drop_compact_storage: true diff --git a/test/conf/unit-test-conf/test-native-port.yaml b/test/conf/unit-test-conf/test-native-port.yaml index b46525f..58cd383 100644 --- a/test/conf/unit-test-conf/test-native-port.yaml +++ b/test/conf/unit-test-conf/test-native-port.yaml @@ -47,7 +47,7 @@ enable_scripted_user_defined_functions: true prepared_statements_cache_size_mb: 1 corrupted_tombstone_strategy: exception stream_entire_sstables: true -stream_throughput_outbound_megabits_per_sec: 200000000 +stream_throughput_outbound: 23841858MiB/s client_encryption_options: enabled: true diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java index 0bf9b9f..d327fd7 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java +++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java @@ -48,10 +48,10 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0")) .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL) .set(streamEntireSSTables - ? "entire_sstable_stream_throughput_outbound_megabits_per_sec" - : "stream_throughput_outbound_megabits_per_sec", - throughput) - .set("compaction_throughput_mb_per_sec", 1) + ? "entire_sstable_stream_throughput_outbound" + : "stream_throughput_outbound", + throughput+"MiB/s") + .set("compaction_throughput", "1MiB/s") .set("stream_entire_sstables", streamEntireSSTables)); try (final Cluster cluster = builder.withNodes(1).start()) diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java index 545461e..8cc8a44 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java @@ -37,6 +37,6 @@ public class NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest extends @Test public void testWithStreamingEntireSSTablesWithoutCompressionWithoutThrottling() throws Exception { - executeTest(true, false, -1); + executeTest(true, false, 0); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java index 36bde63..8e280a3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java @@ -54,8 +54,8 @@ public class NetstatsRepairStreamingTest extends AbstractNetstatsStreaming try (final Cluster cluster = Cluster.build() .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0")) .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL) - .set("stream_throughput_outbound_megabits_per_sec", 1) - .set("compaction_throughput_mb_per_sec", 1) + .set("stream_throughput_outbound", "122KiB/s") + .set("compaction_throughput", "1MiB/s") .set("stream_entire_sstables", false)).start()) { final IInvokableInstance node1 = cluster.get(1); diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java index c4259fb..19d7709 100644 --- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -59,8 +59,8 @@ public class LongStreamingTest StorageService.instance.initServer(); StorageService.instance.setCompactionThroughputMbPerSec(0); - StorageService.instance.setStreamThroughputMbPerSec(0); - StorageService.instance.setInterDCStreamThroughputMbPerSec(0); + StorageService.instance.setStreamThroughputMbitPerSec(0); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(0); } @Test @@ -109,11 +109,11 @@ public class LongStreamingTest System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(nanoTime() - start))); File[] dataFiles = dataDir.tryList((dir, name) -> name.endsWith("-Data.db")); - long dataSize = 0l; + long dataSizeInBytes = 0l; for (File file : dataFiles) { System.err.println("File : "+file.absolutePath()); - dataSize += file.length(); + dataSizeInBytes += file.length(); } SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() @@ -137,9 +137,9 @@ public class LongStreamingTest loader.stream().get(); long millis = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start); - System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec", + System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f MiBsec", millis/1000d, - (dataSize / (1 << 20) / (millis / 1000d)) * 8)); + (dataSizeInBytes / (1 << 20) / (millis / 1000d)) * 8)); //Stream again @@ -164,9 +164,9 @@ public class LongStreamingTest loader.stream().get(); millis = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start); - System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec", + System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f MiBsec", millis/1000d, - (dataSize / (1 << 20) / (millis / 1000d)) * 8)); + (dataSizeInBytes / (1 << 20) / (millis / 1000d)) * 8)); //Compact them both @@ -174,9 +174,9 @@ public class LongStreamingTest Keyspace.open(KS).getColumnFamilyStore(TABLE).forceMajorCompaction(); millis = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start); - System.err.println(String.format("Finished Compacting in %.2f seconds: %.2f Mb/sec", + System.err.println(String.format("Finished Compacting in %.2f seconds: %.2f MiBsec", millis / 1000d, - (dataSize * 2 / (1 << 20) / (millis / 1000d)) * 8)); + (dataSizeInBytes * 2 / (1 << 20) / (millis / 1000d)) * 8)); UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + KS + '.' + TABLE + " limit 100;"); assertEquals(100, rs.size()); diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java index 84c6415..d721298 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java +++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java @@ -80,7 +80,7 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; /** - * Please ensure that this benchmark is run with entire_sstable_stream_throughput_outbound_megabits_per_sec + * Please ensure that this benchmark is run with entire_sstable_stream_throughput_outbound * set to a really high value otherwise, throttling will kick in and the results will not be meaningful. */ @Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 7eb9ed8..dd98509 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -198,7 +198,19 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.ConsoleAppenderCustomizer", "org.apache.cassandra.locator.InetAddressAndPort", "org.apache.cassandra.cql3.statements.schema.AlterKeyspaceStatement", - "org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement" + "org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement", + "org.apache.cassandra.config.DurationSpec", + "org.apache.cassandra.config.DataStorageSpec", + "org.apache.cassandra.config.DataStorageSpec$DataStorageUnit", + "org.apache.cassandra.config.DataStorageSpec$DataStorageUnit$1", + "org.apache.cassandra.config.DataStorageSpec$DataStorageUnit$2", + "org.apache.cassandra.config.DataStorageSpec$DataStorageUnit$3", + "org.apache.cassandra.config.DataStorageSpec$DataStorageUnit$4", + "org.apache.cassandra.config.DataRateSpec", + "org.apache.cassandra.config.DataRateSpec$DataRateUnit", + "org.apache.cassandra.config.DataRateSpec$DataRateUnit$1", + "org.apache.cassandra.config.DataRateSpec$DataRateUnit$2", + "org.apache.cassandra.config.DataRateSpec$DataRateUnit$3" }; static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses)); diff --git a/test/unit/org/apache/cassandra/config/LoadOldYAMLBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/config/LoadOldYAMLBackwardCompatibilityTest.java index 575f04d..750939f 100644 --- a/test/unit/org/apache/cassandra/config/LoadOldYAMLBackwardCompatibilityTest.java +++ b/test/unit/org/apache/cassandra/config/LoadOldYAMLBackwardCompatibilityTest.java @@ -28,17 +28,98 @@ public class LoadOldYAMLBackwardCompatibilityTest @BeforeClass public static void setupDatabaseDescriptor() { - System.setProperty("cassandra.config", "cassandra_deprecated_parameters_names.yaml"); + System.setProperty("cassandra.config", "cassandra-old.yaml"); DatabaseDescriptor.daemonInitialization(); } - // CASSANDRA-17141 + // CASSANDRA-15234 @Test public void testConfigurationLoaderBackwardCompatibility() { Config config = DatabaseDescriptor.loadConfig(); - //Confirm parameters were successfully read with the old names from cassandra-old.yaml - assertEquals(5, config.internode_socket_send_buffer_size_in_bytes); + //Confirm parameters were successfully read with their old names and the default values in cassandra-old.yaml + /*assertEquals(5, config.internode_socket_send_buffer_size_in_bytes); assertEquals(5, config.internode_socket_receive_buffer_size_in_bytes); + assertEquals(DurationSpec.inMilliseconds(10800000), config.max_hint_window); + assertEquals(DurationSpec.inMilliseconds(0), config.native_transport_idle_timeout); + assertEquals(DurationSpec.inMilliseconds(10000), config.request_timeout); + assertEquals(DurationSpec.inMilliseconds(5000), config.read_request_timeout); + assertEquals(DurationSpec.inMilliseconds(10000), config.range_request_timeout); + assertEquals(DurationSpec.inMilliseconds(2000), config.write_request_timeout); + assertEquals(DurationSpec.inMilliseconds(5000), config.counter_write_request_timeout); + assertEquals(DurationSpec.inMilliseconds(1000), config.cas_contention_timeout); + assertEquals(DurationSpec.inMilliseconds(60000), config.truncate_request_timeout); + assertEquals(DurationSpec.inSeconds(300), config.streaming_keep_alive_period); + assertEquals(DurationSpec.inMilliseconds(500), config.slow_query_log_timeout); + assertNull(config.memtable_heap_space); + assertNull(config.memtable_offheap_space); + assertNull( config.repair_session_space); + assertEquals(DataStorageSpec.inBytes(4194304), config.internode_application_send_queue_capacity); + assertEquals(DataStorageSpec.inBytes(134217728), config.internode_application_send_queue_reserve_endpoint_capacity); + assertEquals(DataStorageSpec.inBytes(536870912), config.internode_application_send_queue_reserve_global_capacity); + assertEquals(DataStorageSpec.inBytes(4194304), config.internode_application_receive_queue_capacity); + assertEquals(DataStorageSpec.inBytes(134217728), config.internode_application_receive_queue_reserve_endpoint_capacity); + assertEquals(DataStorageSpec.inBytes(536870912), config.internode_application_receive_queue_reserve_global_capacity); + assertEquals(DurationSpec.inMilliseconds(2000), config.internode_tcp_connect_timeout); + assertEquals(DurationSpec.inMilliseconds(30000), config.internode_tcp_user_timeout); + assertEquals(DurationSpec.inMilliseconds(300000), config.internode_streaming_tcp_user_timeout); + assertEquals(DataStorageSpec.inMebibytes(16), config.native_transport_max_frame_size); + assertEquals(DataStorageSpec.inMebibytes(256), config.max_value_size); + assertEquals(DataStorageSpec.inKibibytes(64), config.column_index_size); + assertEquals(DataStorageSpec.inKibibytes(2), config.column_index_cache_size); + assertEquals(DataStorageSpec.inKibibytes(5), config.batch_size_warn_threshold);*/ + assertEquals(DataRateSpec.inMebibytesPerSecond(64), config.compaction_throughput); + //assertEquals(DataStorageSpec.inMebibytes(50), config.min_free_space_per_drive); + assertEquals(DataRateSpec.inMebibytesPerSecond(23841858).toString(), config.stream_throughput_outbound.toString()); + assertEquals(DataRateSpec.megabitsPerSecondInMebibytesPerSecond(200000000).toString(), config.stream_throughput_outbound.toString()); + assertEquals(DataRateSpec.inMebibytesPerSecond(24), config.inter_dc_stream_throughput_outbound); + /*assertNull(config.commitlog_total_space); + assertEquals(DurationSpec.inDoubleMilliseconds(0), config.commitlog_sync_group_window); + assertEquals(DurationSpec.inMilliseconds(0), config.commitlog_sync_period); + assertEquals(DataStorageSpec.inMebibytes(32), config.commitlog_segment_size); + assertNull(config.periodic_commitlog_sync_lag_block); //Integer + assertNull(config.max_mutation_size); + assertEquals(DataStorageSpec.inMebibytes(0), config.cdc_total_space); + assertEquals(DurationSpec.inMilliseconds(250), config.cdc_free_space_check_interval); + assertEquals(DurationSpec.inMilliseconds(100), config.dynamic_snitch_update_interval); + assertEquals(DurationSpec.inMilliseconds(600000), config.dynamic_snitch_reset_interval); + assertEquals(DataStorageSpec.inKibibytes(1024), config.hinted_handoff_throttle); + assertEquals(DataStorageSpec.inKibibytes(1024), config.batchlog_replay_throttle); + assertEquals(DurationSpec.inMilliseconds(10000), config.hints_flush_period); + assertEquals(DataStorageSpec.inMebibytes(128), config.max_hints_file_size); + assertEquals(DataStorageSpec.inKibibytes(10240), config.trickle_fsync_interval); + assertEquals(DataStorageSpec.inMebibytes(50), config.sstable_preemptive_open_interval); + assertNull( config.key_cache_size); + assertEquals(DataStorageSpec.inMebibytes(0), config.row_cache_size); + assertNull(config.counter_cache_size); + assertNull(config.networking_cache_size); + assertNull(config.file_cache_size); + assertNull(config.index_summary_capacity); + assertEquals(DurationSpec.inMilliseconds(200), config.gc_log_threshold); + assertEquals(DurationSpec.inMilliseconds(1000), config.gc_warn_threshold); + assertEquals(DurationSpec.inSeconds(86400), config.trace_type_query_ttl); + assertEquals(DurationSpec.inSeconds(604800), config.trace_type_repair_ttl); + assertNull(config.prepared_statements_cache_size); + assertFalse(config.user_defined_functions_enabled); + assertFalse(config.scripted_user_defined_functions_enabled); + assertFalse(config.materialized_views_enabled); + assertFalse(config.transient_replication_enabled); + assertFalse(config.sasi_indexes_enabled); + assertFalse(config.drop_compact_storage_enabled); + assertTrue(config.user_defined_functions_threads_enabled); + assertEquals(DurationSpec.inMilliseconds(2000), config.permissions_validity); + assertEquals(DurationSpec.inMilliseconds(0), config.permissions_update_interval); + assertEquals(DurationSpec.inMilliseconds(2000), config.roles_validity); + assertEquals(DurationSpec.inMilliseconds(0), config.roles_update_interval); + assertEquals(DurationSpec.inMilliseconds(2000), config.credentials_validity); + assertEquals(DurationSpec.inMilliseconds(0), config.credentials_update_interval); + assertEquals(DurationSpec.inMinutes(60), config.index_summary_resize_interval); + + //parameters which names have not changed with CASSANDRA-15234 + assertEquals(DurationSpec.inSecondsString("14400"), config.key_cache_save_period); + assertEquals(DurationSpec.inHours(4), config.key_cache_save_period); + assertEquals(DurationSpec.inSecondsString("0"), config.row_cache_save_period); + assertEquals(DurationSpec.inSeconds(0), config.row_cache_save_period); + assertEquals(DurationSpec.inHours(2), config.counter_cache_save_period);*/ } } diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java index 8f1ef69..c6d6390 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java @@ -185,7 +185,7 @@ public class CrcCheckChanceTest extends CQLTester cfs.forceBlockingFlush(); } - DatabaseDescriptor.setCompactionThroughputMbPerSec(1); + DatabaseDescriptor.setCompactionThroughputMebibytesPerSec(1); List<? extends Future<?>> futures = CompactionManager.instance.submitMaximal(cfs, CompactionManager.getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()), false); execute("DROP TABLE %s"); diff --git a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java index 249850c..4775fd1 100644 --- a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java +++ b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java @@ -121,8 +121,8 @@ public class AsyncStreamingOutputPlusTest public void testWriteFileToChannelEntireSSTableNoThrottling() throws IOException { // Disable throttling by setting entire SSTable throughput and entire SSTable inter-DC throughput to 0 - DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(0); - DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(0); + DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMebibytesPerSec(0); + DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec(0); StreamManager.StreamRateLimiter.updateEntireSSTableThroughput(); StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput(); @@ -133,8 +133,8 @@ public class AsyncStreamingOutputPlusTest public void testWriteFileToChannelEntireSSTable() throws IOException { // Enable entire SSTable throttling by setting it to 200 Mbps - DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(200); - DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(200); + DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMebibytesPerSec(200); + DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec(200); StreamManager.StreamRateLimiter.updateEntireSSTableThroughput(); StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput(); diff --git a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java index 9d669ba..b1168d6 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java @@ -22,29 +22,37 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DataRateSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; -import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT; +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEBIBYTE; import static org.junit.Assert.assertEquals; public class StreamManagerTest { - private static int defaultStreamThroughputMbPerSec; - private static int defaultInterDCStreamThroughputMbPerSec; + private static double defaultStreamThroughputMebibytesPerSec; + private static double defaultInterDCStreamThroughputMebibytesPerSec; + private static final double INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES = DataRateSpec + .megabitsPerSecondInMebibytesPerSecond(Integer.MAX_VALUE) + .toMebibytesPerSecond(); - private static int defaultEntireSSTableStreamThroughputMbPerSec; - private static int defaultEntireSSTableInterDCStreamThroughputMbPerSec; + private static double defaultEntireSSTableStreamThroughputMebibytesPerSec; + private static double defaultEntireSSTableInterDCStreamThroughputMebibytesPerSec; + + final double MEBIBYTES_PER_MEGABIT = 0.119209289550781; @BeforeClass public static void setupClass() { Config c = DatabaseDescriptor.loadConfig(); - defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec; - defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec; - defaultEntireSSTableStreamThroughputMbPerSec = c.entire_sstable_stream_throughput_outbound_megabits_per_sec; - defaultEntireSSTableInterDCStreamThroughputMbPerSec = c.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec; + + defaultStreamThroughputMebibytesPerSec = c.stream_throughput_outbound.toMebibytesPerSecond(); + defaultInterDCStreamThroughputMebibytesPerSec = c.inter_dc_stream_throughput_outbound.toMebibytesPerSecond(); + defaultEntireSSTableStreamThroughputMebibytesPerSec = c.entire_sstable_stream_throughput_outbound.toMebibytesPerSecond(); + defaultEntireSSTableInterDCStreamThroughputMebibytesPerSec = c.entire_sstable_inter_dc_stream_throughput_outbound.toMebibytesPerSecond(); + DatabaseDescriptor.daemonInitialization(() -> c); } @@ -52,22 +60,18 @@ public class StreamManagerTest public void testUpdateStreamThroughput() { // Initialized value check - assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + assertEquals(defaultStreamThroughputMebibytesPerSec * BYTES_PER_MEBIBYTE, StreamRateLimiter.getRateLimiterRateInBytes(), 0); // Positive value check - StorageService.instance.setStreamThroughputMbPerSec(500); - assertEquals(500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + StorageService.instance.setStreamThroughputMbitPerSec(500); //60MiB/s + assertEquals(500 * MEBIBYTES_PER_MEGABIT * BYTES_PER_MEBIBYTE, StreamRateLimiter.getRateLimiterRateInBytes(), 0); // Max positive value check - StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + StorageService.instance.setStreamThroughputMbitPerSec(Integer.MAX_VALUE); + assertEquals(INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES * BYTES_PER_MEBIBYTE, StreamRateLimiter.getRateLimiterRateInBytes(), 0.04); // Zero value check - StorageService.instance.setStreamThroughputMbPerSec(0); - assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0); - - // Negative value check - StorageService.instance.setStreamThroughputMbPerSec(-200); + StorageService.instance.setStreamThroughputMbitPerSec(0); assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0); } @@ -75,22 +79,18 @@ public class StreamManagerTest public void testUpdateEntireSSTableStreamThroughput() { // Initialized value check (defaults to StreamRateLimiter.getRateLimiterRateInBytes()) - assertEquals(defaultEntireSSTableStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); + assertEquals(defaultEntireSSTableStreamThroughputMebibytesPerSec * BYTES_PER_MEBIBYTE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); // Positive value check - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(1500); - assertEquals(1500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(1500); + assertEquals(1500d * BYTES_PER_MEBIBYTE, Math.round(StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes()), 0); // Max positive value check - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE * BYTES_PER_MEBIBYTE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); // Zero value check - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0); - assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); - - // Negative value check - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(-200); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(0); assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0); } @@ -98,22 +98,18 @@ public class StreamManagerTest public void testUpdateInterDCStreamThroughput() { // Initialized value check - assertEquals(defaultInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + assertEquals(defaultInterDCStreamThroughputMebibytesPerSec * BYTES_PER_MEBIBYTE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); // Positive value check - StorageService.instance.setInterDCStreamThroughputMbPerSec(200); - assertEquals(200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(200); //approximately 24MiB/s + assertEquals(200 * MEBIBYTES_PER_MEGABIT * BYTES_PER_MEBIBYTE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); // Max positive value check - StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(Integer.MAX_VALUE); + assertEquals(INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES * BYTES_PER_MEBIBYTE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0.04); // Zero value check - StorageService.instance.setInterDCStreamThroughputMbPerSec(0); - assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); - - // Negative value check - StorageService.instance.setInterDCStreamThroughputMbPerSec(-200); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(0); assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); } @@ -121,22 +117,18 @@ public class StreamManagerTest public void testUpdateEntireSSTableInterDCStreamThroughput() { // Initialized value check (Defaults to StreamRateLimiter.getInterDCRateLimiterRateInBytes()) - assertEquals(defaultEntireSSTableInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); + assertEquals(defaultEntireSSTableInterDCStreamThroughputMebibytesPerSec * BYTES_PER_MEBIBYTE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); // Positive value check - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(1200); - assertEquals(1200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(1200); + assertEquals(1200.0d * BYTES_PER_MEBIBYTE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); // Max positive value check - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE * BYTES_PER_MEBIBYTE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); // Zero value check - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(0); - assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); - - // Negative value check - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(-200); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(0); assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0); } -} +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java b/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java index a518a66..3b72c4d 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java @@ -47,8 +47,8 @@ public class StreamRateLimiterTest public void testIsRateLimited() { // Enable rate limiting for local traffic and inter-DC traffic - StorageService.instance.setStreamThroughputMbPerSec(200); - StorageService.instance.setInterDCStreamThroughputMbPerSec(200); + StorageService.instance.setStreamThroughputMbitPerSec(200); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(200); // Rate-limiter enabled for a local peer assertTrue(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -57,8 +57,8 @@ public class StreamRateLimiterTest assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited()); // Disable rate limiting for local traffic, but enable it for inter-DC traffic - StorageService.instance.setStreamThroughputMbPerSec(0); - StorageService.instance.setInterDCStreamThroughputMbPerSec(200); + StorageService.instance.setStreamThroughputMbitPerSec(0); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(200); // Rate-limiter disabled for a local peer assertFalse(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -67,8 +67,8 @@ public class StreamRateLimiterTest assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited()); // Enable rate limiting for local traffic, but disable it for inter-DC traffic - StorageService.instance.setStreamThroughputMbPerSec(200); - StorageService.instance.setInterDCStreamThroughputMbPerSec(0); + StorageService.instance.setStreamThroughputMbitPerSec(200); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(0); // Rate-limiter enabled for a local peer assertTrue(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -77,8 +77,8 @@ public class StreamRateLimiterTest assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited()); // Disable rate liming for local and inter-DC traffic - StorageService.instance.setStreamThroughputMbPerSec(0); - StorageService.instance.setInterDCStreamThroughputMbPerSec(-1); + StorageService.instance.setStreamThroughputMbitPerSec(0); + StorageService.instance.setInterDCStreamThroughputMbitPerSec(0); // Rate-limiter enabled for a local and remote peers assertFalse(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -89,8 +89,8 @@ public class StreamRateLimiterTest public void testEntireSSTableStreamingIsRateLimited() { // Enable rate limiting for local traffic and inter-DC traffic - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(200); - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(200); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(200); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(200); // Rate-limiter enabled for a local peer assertTrue(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -99,8 +99,8 @@ public class StreamRateLimiterTest assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited()); // Disable rate limiting for local traffic, but enable it for inter-DC traffic - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0); - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(200); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(0); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(200); // Rate-limiter disabled for a local peer assertFalse(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -109,8 +109,8 @@ public class StreamRateLimiterTest assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited()); // Enable rate limiting for local traffic, but disable it for inter-DC traffic - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(200); - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(0); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(200); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(0); // Rate-limiter enabled for a local peer assertTrue(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); @@ -119,8 +119,8 @@ public class StreamRateLimiterTest assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited()); // Disable rate liming for local and inter-DC traffic - StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0); - StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(-1); + StorageService.instance.setEntireSSTableStreamThroughputMebibytesPerSec(0); + StorageService.instance.setEntireSSTableInterDCStreamThroughputMebibytesPerSec(0); // Rate-limiter enabled for a local and remote peers assertFalse(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited()); diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java similarity index 61% copy from test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java copy to test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java index ecf01b1..604cd7b 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java @@ -23,17 +23,14 @@ import org.junit.Test; import org.apache.cassandra.cql3.CQLTester; -import static org.assertj.core.api.Assertions.withPrecision; - -import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import static org.apache.cassandra.tools.ToolRunner.ToolResult; import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; import static org.assertj.core.api.Assertions.assertThat; /** - * Tests for {@code nodetool setstreamthroughput} and {@code nodetool getstreamthroughput}. + * Tests for {@code nodetool setcompactionthroughput} and {@code nodetool getcompactionthroughput}. */ -public class SetGetStreamThroughputTest extends CQLTester +public class SetGetCompactionThroughputTest extends CQLTester { @BeforeClass public static void setup() throws Exception @@ -44,67 +41,59 @@ public class SetGetStreamThroughputTest extends CQLTester @Test public void testNull() { - assertSetInvalidThroughput(null, "Required parameters are missing: stream_throughput"); + assertSetInvalidThroughput(null, "Required parameters are missing: compaction_throughput"); } @Test public void testPositive() { - assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(7); } @Test public void testMaxValue() { - assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(Integer.MAX_VALUE); } @Test public void testZero() { - assertSetGetValidThroughput(0, Double.MAX_VALUE); - } - - @Test - public void testNegative() - { - assertSetGetValidThroughput(-7, Double.MAX_VALUE); + assertSetGetValidThroughput(0); } @Test public void testUnparseable() { - assertSetInvalidThroughput("1.2", "stream_throughput: can not convert \"1.2\" to a int"); - assertSetInvalidThroughput("value", "stream_throughput: can not convert \"value\" to a int"); + assertSetInvalidThroughput("1.2", "compaction_throughput: can not convert \"1.2\" to a Integer"); + assertSetInvalidThroughput("value", "compaction_throughput: can not convert \"value\" to a Integer"); } - private static void assertSetGetValidThroughput(int throughput, double rateInBytes) + private static void assertSetGetValidThroughput(int throughput) { - ToolResult tool = invokeNodetool("setstreamthroughput", String.valueOf(throughput)); + ToolResult tool = invokeNodetool("setcompactionthroughput", String.valueOf(throughput)); tool.assertOnCleanExit(); assertThat(tool.getStdout()).isEmpty(); assertGetThroughput(throughput); - - assertThat(StreamRateLimiter.getRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01)); } private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage) { - ToolResult tool = throughput == null ? invokeNodetool("setstreamthroughput") - : invokeNodetool("setstreamthroughput", throughput); + ToolResult tool = throughput == null ? invokeNodetool("setcompactionthroughput") + : invokeNodetool("setcompactionthroughput", throughput); assertThat(tool.getExitCode()).isEqualTo(1); assertThat(tool.getStdout()).contains(expectedErrorMessage); } private static void assertGetThroughput(int expected) { - ToolResult tool = invokeNodetool("getstreamthroughput"); + ToolResult tool = invokeNodetool("getcompactionthroughput"); tool.assertOnCleanExit(); if (expected > 0) - assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s"); + assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MiB/s"); else - assertThat(tool.getStdout()).contains("Current stream throughput: unlimited"); + assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MiB/s"); } } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java index 3f32ac0..d412ab7 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java @@ -49,13 +49,13 @@ public class SetGetEntireSSTableInterDCStreamThroughputTest extends CQLTester @Test public void testPositive() { - assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test public void testMaxValue() { - assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test @@ -65,12 +65,6 @@ public class SetGetEntireSSTableInterDCStreamThroughputTest extends CQLTester } @Test - public void testNegative() - { - assertSetGetValidThroughput(-7, Double.MAX_VALUE); - } - - @Test public void testUnparseable() { assertSetInvalidThroughput("1.2", "inter_dc_stream_throughput: can not convert \"1.2\" to a int"); @@ -102,7 +96,7 @@ public class SetGetEntireSSTableInterDCStreamThroughputTest extends CQLTester tool.assertOnCleanExit(); if (expected > 0) - assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: " + expected + " Mb/s"); + assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: " + expected + " MiB/s"); else assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: unlimited"); } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java index 7250b5f..10426b0 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java @@ -49,13 +49,13 @@ public class SetGetEntireSSTableStreamThroughputTest extends CQLTester @Test public void testPositive() { - assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test public void testMaxValue() { - assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test @@ -65,12 +65,6 @@ public class SetGetEntireSSTableStreamThroughputTest extends CQLTester } @Test - public void testNegative() - { - assertSetGetValidThroughput(-7, Double.MAX_VALUE); - } - - @Test public void testUnparseable() { assertSetInvalidThroughput("1.2", "stream_throughput: can not convert \"1.2\" to a int"); @@ -107,7 +101,7 @@ public class SetGetEntireSSTableStreamThroughputTest extends CQLTester tool.assertOnCleanExit(); if (expected > 0) - assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: " + expected + " Mb/s"); + assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: " + expected + " MiB/s"); else assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: unlimited"); } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java index b786e5d..6b06c69 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tools.nodetool; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.DataRateSpec; import org.apache.cassandra.cql3.CQLTester; import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; @@ -34,6 +35,10 @@ import static org.assertj.core.api.Assertions.withPrecision; */ public class SetGetInterDCStreamThroughputTest extends CQLTester { + private static final double INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES = DataRateSpec + .megabitsPerSecondInMebibytesPerSecond(Integer.MAX_VALUE) + .toMebibytesPerSecond(); + @BeforeClass public static void setup() throws Exception { @@ -49,25 +54,28 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester @Test public void testPositive() { - assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(7, 0.834465026855467 * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test - public void testMaxValue() + public void testSmallPositive() { - assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT); + // As part of CASSANDRA-15234 we had to do some tweaks with precision. This test has to ensure no regressions + // happen, hopefully. Internally data rate parameters values and rate limitter are set in double. Users can set + // and get only integers + assertSetGetValidThroughput(1, 0.119209289550781 * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test - public void testZero() + public void testMaxValue() { - assertSetGetValidThroughput(0, Double.MAX_VALUE); + assertSetGetValidThroughput(Integer.MAX_VALUE, INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test - public void testNegative() + public void testZero() { - assertSetGetValidThroughput(-7, Double.MAX_VALUE); + assertSetGetValidThroughput(0, Double.MAX_VALUE); } @Test @@ -85,7 +93,7 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester assertGetThroughput(throughput); - assertThat(StreamRateLimiter.getInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01)); + assertThat(StreamRateLimiter.getInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.04)); } private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage) @@ -102,7 +110,7 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester tool.assertOnCleanExit(); if (expected > 0) - assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s"); + assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " megabits per second"); else assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: unlimited"); } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java index ecf01b1..7000387 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tools.nodetool; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.DataRateSpec; import org.apache.cassandra.cql3.CQLTester; import static org.assertj.core.api.Assertions.withPrecision; @@ -35,6 +36,10 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class SetGetStreamThroughputTest extends CQLTester { + private static final double INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES = DataRateSpec + .megabitsPerSecondInMebibytesPerSecond(Integer.MAX_VALUE) + .toMebibytesPerSecond(); + @BeforeClass public static void setup() throws Exception { @@ -50,25 +55,28 @@ public class SetGetStreamThroughputTest extends CQLTester @Test public void testPositive() { - assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT); + assertSetGetValidThroughput(7, 0.834465026855467 * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test - public void testMaxValue() + public void testSmallPositive() { - assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT); + // As part of CASSANDRA-15234 we had to do some tweaks with precision. This test has to ensure no regressions + // happen, hopefully. Internally data rate parameters values and rate limitter are set in double. Users can set + // and get only integers + assertSetGetValidThroughput(1, 0.119209289550781 * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test - public void testZero() + public void testMaxValue() { - assertSetGetValidThroughput(0, Double.MAX_VALUE); + assertSetGetValidThroughput(Integer.MAX_VALUE, INTEGER_MAX_VALUE_MEGABITS_IN_MEBIBYTES * StreamRateLimiter.BYTES_PER_MEBIBYTE); } @Test - public void testNegative() + public void testZero() { - assertSetGetValidThroughput(-7, Double.MAX_VALUE); + assertSetGetValidThroughput(0, Double.MAX_VALUE); } @Test @@ -86,7 +94,7 @@ public class SetGetStreamThroughputTest extends CQLTester assertGetThroughput(throughput); - assertThat(StreamRateLimiter.getRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01)); + assertThat(StreamRateLimiter.getRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.04)); } private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage) @@ -103,7 +111,7 @@ public class SetGetStreamThroughputTest extends CQLTester tool.assertOnCleanExit(); if (expected > 0) - assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s"); + assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " megabits per second"); else assertThat(tool.getStdout()).contains("Current stream throughput: unlimited"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org