This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c6e897d2d43bd8c2dff9553cee466231247b9840
Merge: 9bf14a6 4f8afe8
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
AuthorDate: Fri Oct 1 11:52:07 2021 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../apache/cassandra/service/StorageService.java   |  8 +-
 .../apache/cassandra/streaming/StreamManager.java  | 58 +++++++++-----
 .../tools/nodetool/SetInterDCStreamThroughput.java |  3 +-
 .../tools/nodetool/SetStreamThroughput.java        |  3 +-
 .../cassandra/streaming/StreamManagerTest.java     | 91 ++++++++++++++++++++++
 6 files changed, 142 insertions(+), 22 deletions(-)

diff --cc CHANGES.txt
index 84c5bcf,53858ac..37ec9c1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,5 +1,16 @@@
 -3.0.26:
 +3.11.12
 + * Add key validation to ssstablescrub (CASSANDRA-16969)
 + * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Validate SASI tokenizer options before adding index to schema 
(CASSANDRA-15135)
 + * Fixup scrub output when no data post-scrub and clear up old use of row, 
which really means partition (CASSANDRA-16835)
 + * Fix ant-junit dependency issue (CASSANDRA-16827)
 + * Reduce thread contention in CommitLogSegment and HintsBuffer 
(CASSANDRA-16072)
 + * Avoid sending CDC column if not enabled (CASSANDRA-16770)
 +Merged from 3.0:
+  * Immediately apply stream throughput, considering negative values as 
unthrottled (CASSANDRA-16959)
   * Do not release new SSTables in offline transactions (CASSANDRA-16975)
   * ArrayIndexOutOfBoundsException in FunctionResource#fromName 
(CASSANDRA-16977, CASSANDRA-16995)
   * CVE-2015-0886 Security vulnerability in jbcrypt is addressed 
(CASSANDRA-9384)
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 9cfdd80,78d1120..1a0d5b0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -1390,98 -1257,12 +1390,100 @@@ public class StorageService extends Not
          }
      }
  
 +    public void setRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setRpcTimeout(value);
 +        logger.info("set rpc timeout to {} ms", value);
 +    }
 +
 +    public long getRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getRpcTimeout();
 +    }
 +
 +    public void setReadRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setReadRpcTimeout(value);
 +        logger.info("set read rpc timeout to {} ms", value);
 +    }
 +
 +    public long getReadRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getReadRpcTimeout();
 +    }
 +
 +    public void setRangeRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setRangeRpcTimeout(value);
 +        logger.info("set range rpc timeout to {} ms", value);
 +    }
 +
 +    public long getRangeRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getRangeRpcTimeout();
 +    }
 +
 +    public void setWriteRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setWriteRpcTimeout(value);
 +        logger.info("set write rpc timeout to {} ms", value);
 +    }
 +
 +    public long getWriteRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getWriteRpcTimeout();
 +    }
 +
 +    public void setCounterWriteRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setCounterWriteRpcTimeout(value);
 +        logger.info("set counter write rpc timeout to {} ms", value);
 +    }
 +
 +    public long getCounterWriteRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getCounterWriteRpcTimeout();
 +    }
 +
 +    public void setCasContentionTimeout(long value)
 +    {
 +        DatabaseDescriptor.setCasContentionTimeout(value);
 +        logger.info("set cas contention rpc timeout to {} ms", value);
 +    }
 +
 +    public long getCasContentionTimeout()
 +    {
 +        return DatabaseDescriptor.getCasContentionTimeout();
 +    }
 +
 +    public void setTruncateRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setTruncateRpcTimeout(value);
 +        logger.info("set truncate rpc timeout to {} ms", value);
 +    }
 +
 +    public long getTruncateRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getTruncateRpcTimeout();
 +    }
 +
 +    public void setStreamingSocketTimeout(int value)
 +    {
 +        DatabaseDescriptor.setStreamingSocketTimeout(value);
 +        logger.info("set streaming socket timeout to {} ms", value);
 +    }
 +
 +    public int getStreamingSocketTimeout()
 +    {
 +        return DatabaseDescriptor.getStreamingSocketTimeout();
 +    }
 +
      public void setStreamThroughputMbPerSec(int value)
      {
+         int oldValue = 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
          DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
-         logger.info("setstreamthroughput: throttle set to {}", value);
+         StreamManager.StreamRateLimiter.updateThroughput();
+         logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} 
Mb/s)", value, oldValue);
      }
  
      public int getStreamThroughputMbPerSec()
diff --cc test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
index 0000000,69db960..625d9d5
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
@@@ -1,0 -1,90 +1,91 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.streaming;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.service.StorageService;
+ 
+ import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+ import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class StreamManagerTest
+ {
+     private static int defaultStreamThroughputMbPerSec;
+     private static int defaultInterDCStreamThroughputMbPerSec;
+ 
+     @BeforeClass
+     public static void setupClass()
+     {
+         Config c = DatabaseDescriptor.loadConfig();
+         defaultStreamThroughputMbPerSec = 
c.stream_throughput_outbound_megabits_per_sec;
+         defaultInterDCStreamThroughputMbPerSec = 
c.inter_dc_stream_throughput_outbound_megabits_per_sec;
++        DatabaseDescriptor.daemonInitialization(() -> c);
+     }
+ 
+     @Test
+     public void testUpdateStreamThroughput()
+     {
+         // Initialized value check
+         assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, 
StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Positive value check
+         StorageService.instance.setStreamThroughputMbPerSec(500);
+         assertEquals(500.0d * BYTES_PER_MEGABIT, 
StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Max positive value check
+         
StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE);
+         assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, 
StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Zero value check
+         StorageService.instance.setStreamThroughputMbPerSec(0);
+         assertEquals(Double.MAX_VALUE, 
StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Negative value check
+         StorageService.instance.setStreamThroughputMbPerSec(-200);
+         assertEquals(Double.MAX_VALUE, 
StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+     }
+ 
+     @Test
+     public void testUpdateInterDCStreamThroughput()
+     {
+         // Initialized value check
+         assertEquals(defaultInterDCStreamThroughputMbPerSec * 
BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Positive value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+         assertEquals(200.0d * BYTES_PER_MEGABIT, 
StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Max positive value check
+         
StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
+         assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, 
StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Zero value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+         assertEquals(Double.MAX_VALUE, 
StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Negative value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
+         assertEquals(Double.MAX_VALUE, 
StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+     }
+ }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to