This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 96c80f0b89314dc805714c52dcf1846491c53cdc Merge: df1a2d4 9a1bb62 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Jan 17 14:13:41 2022 +0100 Merge branch 'cassandra-4.0' into trunk CHANGES.txt | 1 + .../locator/AbstractReplicationStrategy.java | 92 ++++++++++++----- .../apache/cassandra/locator/TokenMetadata.java | 9 +- .../distributed/test/ring/BootstrapTest.java | 6 +- .../test/ring/ReadsDuringBootstrapTest.java | 115 +++++++++++++++++++++ .../locator/AbstractReplicationStrategyTest.java | 45 ++++++++ 6 files changed, 239 insertions(+), 29 deletions(-) diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java index c3e92f2,52d0f16..2dc7303 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java @@@ -56,7 -56,7 +56,7 @@@ public class BootstrapTest extends Test .withConfig(config -> config.with(NETWORK, GOSSIP)) .start()) { -- populate(cluster,0, 100); ++ populate(cluster, 0, 100); IInstanceConfig config = cluster.newInstanceConfig(); IInvokableInstance newInstance = cluster.bootstrap(config); @@@ -95,7 -95,7 +95,7 @@@ cluster.forEach(statusToBootstrap(newInstance)); -- populate(cluster,0, 100); ++ populate(cluster, 0, 100); Assert.assertEquals(100, newInstance.executeInternal("SELECT *FROM " + KEYSPACE + ".tbl").length); } @@@ -113,7 -113,7 +113,7 @@@ .withConfig(config -> config.with(NETWORK, GOSSIP)) .start()) { -- populate(cluster,0, 100); ++ populate(cluster, 0, 100); bootstrapAndJoinNode(cluster); for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java index 0000000,4898479..932dbdc mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java @@@ -1,0 -1,114 +1,115 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test.ring; + + import java.io.IOException; + import java.util.concurrent.Callable; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.TimeoutException; + import java.util.concurrent.atomic.AtomicBoolean; + + import org.junit.Test; + + import net.bytebuddy.ByteBuddy; + import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; + import net.bytebuddy.implementation.MethodDelegation; + import net.bytebuddy.implementation.bind.annotation.FieldValue; + import net.bytebuddy.implementation.bind.annotation.SuperCall; + import org.apache.cassandra.dht.Token; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.TokenSupplier; + import org.apache.cassandra.distributed.shared.NetworkTopology; + import org.apache.cassandra.distributed.test.TestBaseImpl; + import org.apache.cassandra.locator.AbstractReplicationStrategy; + import org.apache.cassandra.locator.EndpointsForRange; + + import static net.bytebuddy.matcher.ElementMatchers.named; + import static org.apache.cassandra.distributed.api.Feature.GOSSIP; + import static org.apache.cassandra.distributed.api.Feature.NETWORK; + + public class ReadsDuringBootstrapTest extends TestBaseImpl + { ++ + @Test + public void readsDuringBootstrapTest() throws IOException, ExecutionException, InterruptedException, TimeoutException + { + int originalNodeCount = 3; + int expandedNodeCount = originalNodeCount + 1; + ExecutorService es = Executors.newSingleThreadExecutor(); + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP) + .set("read_request_timeout_in_ms", Integer.MAX_VALUE) + .set("request_timeout_in_ms", Integer.MAX_VALUE)) + .withInstanceInitializer(BB::install) + .start()) + { + String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?"); + cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};")); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int PRIMARY KEY)")); + cluster.get(1).runOnInstance(() -> BB.block.set(true)); + Future<?> read = es.submit(() -> cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3)); + long mark = cluster.get(1).logs().mark(); + bootstrapAndJoinNode(cluster); + cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4"); + cluster.get(1).runOnInstance(() -> BB.block.set(false)); + // populate cache + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, i); + cluster.get(1).runOnInstance(() -> BB.latch.countDown()); + read.get(); + } + finally + { + es.shutdown(); + } + } + + public static class BB + { + public static final AtomicBoolean block = new AtomicBoolean(); + public static final CountDownLatch latch = new CountDownLatch(1); ++ + private static void install(ClassLoader cl, Integer instanceId) + { + if (instanceId != 1) + return; + new ByteBuddy().rebase(AbstractReplicationStrategy.class) + .method(named("getCachedReplicas")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static EndpointsForRange getCachedReplicas(long ringVersion, Token t, + @FieldValue("keyspaceName") String keyspaceName, + @SuperCall Callable<EndpointsForRange> zuper) throws Exception + { + if (keyspaceName.equals(KEYSPACE) && block.get()) + latch.await(); + return zuper.call(); + } + } - + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org