This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c6e897d2d43bd8c2dff9553cee466231247b9840 Merge: 9bf14a6 4f8afe8 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Fri Oct 1 11:52:07 2021 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../apache/cassandra/service/StorageService.java | 8 +- .../apache/cassandra/streaming/StreamManager.java | 58 +++++++++----- .../tools/nodetool/SetInterDCStreamThroughput.java | 3 +- .../tools/nodetool/SetStreamThroughput.java | 3 +- .../cassandra/streaming/StreamManagerTest.java | 91 ++++++++++++++++++++++ 6 files changed, 142 insertions(+), 22 deletions(-) diff --cc CHANGES.txt index 84c5bcf,53858ac..37ec9c1 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,15 -1,5 +1,16 @@@ -3.0.26: +3.11.12 + * Add key validation to ssstablescrub (CASSANDRA-16969) + * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851) + * Include SASI components to snapshots (CASSANDRA-15134) + * Make assassinate more resilient to missing tokens (CASSANDRA-16847) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) + * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135) + * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835) + * Fix ant-junit dependency issue (CASSANDRA-16827) + * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072) + * Avoid sending CDC column if not enabled (CASSANDRA-16770) +Merged from 3.0: + * Immediately apply stream throughput, considering negative values as unthrottled (CASSANDRA-16959) * Do not release new SSTables in offline transactions (CASSANDRA-16975) * ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995) * CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384) diff --cc src/java/org/apache/cassandra/service/StorageService.java index 9cfdd80,78d1120..1a0d5b0 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -1390,98 -1257,12 +1390,100 @@@ public class StorageService extends Not } } + public void setRpcTimeout(long value) + { + DatabaseDescriptor.setRpcTimeout(value); + logger.info("set rpc timeout to {} ms", value); + } + + public long getRpcTimeout() + { + return DatabaseDescriptor.getRpcTimeout(); + } + + public void setReadRpcTimeout(long value) + { + DatabaseDescriptor.setReadRpcTimeout(value); + logger.info("set read rpc timeout to {} ms", value); + } + + public long getReadRpcTimeout() + { + return DatabaseDescriptor.getReadRpcTimeout(); + } + + public void setRangeRpcTimeout(long value) + { + DatabaseDescriptor.setRangeRpcTimeout(value); + logger.info("set range rpc timeout to {} ms", value); + } + + public long getRangeRpcTimeout() + { + return DatabaseDescriptor.getRangeRpcTimeout(); + } + + public void setWriteRpcTimeout(long value) + { + DatabaseDescriptor.setWriteRpcTimeout(value); + logger.info("set write rpc timeout to {} ms", value); + } + + public long getWriteRpcTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout(); + } + + public void setCounterWriteRpcTimeout(long value) + { + DatabaseDescriptor.setCounterWriteRpcTimeout(value); + logger.info("set counter write rpc timeout to {} ms", value); + } + + public long getCounterWriteRpcTimeout() + { + return DatabaseDescriptor.getCounterWriteRpcTimeout(); + } + + public void setCasContentionTimeout(long value) + { + DatabaseDescriptor.setCasContentionTimeout(value); + logger.info("set cas contention rpc timeout to {} ms", value); + } + + public long getCasContentionTimeout() + { + return DatabaseDescriptor.getCasContentionTimeout(); + } + + public void setTruncateRpcTimeout(long value) + { + DatabaseDescriptor.setTruncateRpcTimeout(value); + logger.info("set truncate rpc timeout to {} ms", value); + } + + public long getTruncateRpcTimeout() + { + return DatabaseDescriptor.getTruncateRpcTimeout(); + } + + public void setStreamingSocketTimeout(int value) + { + DatabaseDescriptor.setStreamingSocketTimeout(value); + logger.info("set streaming socket timeout to {} ms", value); + } + + public int getStreamingSocketTimeout() + { + return DatabaseDescriptor.getStreamingSocketTimeout(); + } + public void setStreamThroughputMbPerSec(int value) { + int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value); - logger.info("setstreamthroughput: throttle set to {}", value); + StreamManager.StreamRateLimiter.updateThroughput(); + logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue); } public int getStreamThroughputMbPerSec() diff --cc test/unit/org/apache/cassandra/streaming/StreamManagerTest.java index 0000000,69db960..625d9d5 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java @@@ -1,0 -1,90 +1,91 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.streaming; + + import org.junit.BeforeClass; + import org.junit.Test; + + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.service.StorageService; + + import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT; + import static org.junit.Assert.assertEquals; + + public class StreamManagerTest + { + private static int defaultStreamThroughputMbPerSec; + private static int defaultInterDCStreamThroughputMbPerSec; + + @BeforeClass + public static void setupClass() + { + Config c = DatabaseDescriptor.loadConfig(); + defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec; + defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec; ++ DatabaseDescriptor.daemonInitialization(() -> c); + } + + @Test + public void testUpdateStreamThroughput() + { + // Initialized value check + assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + + // Positive value check + StorageService.instance.setStreamThroughputMbPerSec(500); + assertEquals(500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + + // Max positive value check + StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + + // Zero value check + StorageService.instance.setStreamThroughputMbPerSec(0); + assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + + // Negative value check + StorageService.instance.setStreamThroughputMbPerSec(-200); + assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0); + } + + @Test + public void testUpdateInterDCStreamThroughput() + { + // Initialized value check + assertEquals(defaultInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + + // Positive value check + StorageService.instance.setInterDCStreamThroughputMbPerSec(200); + assertEquals(200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + + // Max positive value check + StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + + // Zero value check + StorageService.instance.setInterDCStreamThroughputMbPerSec(0); + assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + + // Negative value check + StorageService.instance.setInterDCStreamThroughputMbPerSec(-200); + assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org