This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 96c80f0b89314dc805714c52dcf1846491c53cdc
Merge: df1a2d4 9a1bb62
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Jan 17 14:13:41 2022 +0100

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |   1 +
 .../locator/AbstractReplicationStrategy.java       |  92 ++++++++++++-----
 .../apache/cassandra/locator/TokenMetadata.java    |   9 +-
 .../distributed/test/ring/BootstrapTest.java       |   6 +-
 .../test/ring/ReadsDuringBootstrapTest.java        | 115 +++++++++++++++++++++
 .../locator/AbstractReplicationStrategyTest.java   |  45 ++++++++
 6 files changed, 239 insertions(+), 29 deletions(-)

diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index c3e92f2,52d0f16..2dc7303
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@@ -56,7 -56,7 +56,7 @@@ public class BootstrapTest extends Test
                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP))
                                          .start())
          {
--            populate(cluster,0, 100);
++            populate(cluster, 0, 100);
  
              IInstanceConfig config = cluster.newInstanceConfig();
              IInvokableInstance newInstance = cluster.bootstrap(config);
@@@ -95,7 -95,7 +95,7 @@@
  
              cluster.forEach(statusToBootstrap(newInstance));
  
--            populate(cluster,0, 100);
++            populate(cluster, 0, 100);
  
              Assert.assertEquals(100, newInstance.executeInternal("SELECT 
*FROM " + KEYSPACE + ".tbl").length);
          }
@@@ -113,7 -113,7 +113,7 @@@
                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP))
                                          .start())
          {
--            populate(cluster,0, 100);
++            populate(cluster, 0, 100);
              bootstrapAndJoinNode(cluster);
  
              for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
index 0000000,4898479..932dbdc
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
@@@ -1,0 -1,114 +1,115 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test.ring;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeoutException;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.junit.Test;
+ 
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.MethodDelegation;
+ import net.bytebuddy.implementation.bind.annotation.FieldValue;
+ import net.bytebuddy.implementation.bind.annotation.SuperCall;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.TokenSupplier;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.test.TestBaseImpl;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
+ import org.apache.cassandra.locator.EndpointsForRange;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ 
+ public class ReadsDuringBootstrapTest extends TestBaseImpl
+ {
++
+     @Test
+     public void readsDuringBootstrapTest() throws IOException, 
ExecutionException, InterruptedException, TimeoutException
+     {
+         int originalNodeCount = 3;
+         int expandedNodeCount = originalNodeCount + 1;
+         ExecutorService es = Executors.newSingleThreadExecutor();
+         try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                         .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                     
.set("read_request_timeout_in_ms", Integer.MAX_VALUE)
+                                                                     
.set("request_timeout_in_ms", Integer.MAX_VALUE))
+                                         .withInstanceInitializer(BB::install)
+                                         .start())
+         {
+             String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?");
+             cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS 
%s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
+             cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int 
PRIMARY KEY)"));
+             cluster.get(1).runOnInstance(() -> BB.block.set(true));
+             Future<?> read = es.submit(() -> 
cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3));
+             long mark = cluster.get(1).logs().mark();
+             bootstrapAndJoinNode(cluster);
+             cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4");
+             cluster.get(1).runOnInstance(() -> BB.block.set(false));
+             // populate cache
+             for (int i = 0; i < 10; i++)
+                 cluster.coordinator(1).execute(query, 
ConsistencyLevel.QUORUM, i);
+             cluster.get(1).runOnInstance(() -> BB.latch.countDown());
+             read.get();
+         }
+         finally
+         {
+             es.shutdown();
+         }
+     }
+ 
+     public static class BB
+     {
+         public static final AtomicBoolean block = new AtomicBoolean();
+         public static final CountDownLatch latch = new CountDownLatch(1);
++
+         private static void install(ClassLoader cl, Integer instanceId)
+         {
+             if (instanceId != 1)
+                 return;
+             new ByteBuddy().rebase(AbstractReplicationStrategy.class)
+                            .method(named("getCachedReplicas"))
+                            .intercept(MethodDelegation.to(BB.class))
+                            .make()
+                            .load(cl, ClassLoadingStrategy.Default.INJECTION);
+         }
+ 
+         public static EndpointsForRange getCachedReplicas(long ringVersion, 
Token t,
+                                                           
@FieldValue("keyspaceName") String keyspaceName,
+                                                           @SuperCall 
Callable<EndpointsForRange> zuper) throws Exception
+         {
+             if (keyspaceName.equals(KEYSPACE) && block.get())
+                 latch.await();
+             return zuper.call();
+         }
+     }
 -
+ }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to