This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new cbe07fd57e Reconfigure CMS after replacement, bootstrap and move operations cbe07fd57e is described below commit cbe07fd57e3d94a1f2512fced3f38e69ad4b3eb2 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Jun 13 16:21:42 2024 +0200 Reconfigure CMS after replacement, bootstrap and move operations Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19705 --- CHANGES.txt | 1 + .../org/apache/cassandra/metrics/TCMMetrics.java | 7 ++ .../org/apache/cassandra/tcm/CMSOperations.java | 27 +++-- .../cassandra/tcm/ClusterMetadataService.java | 14 ++- .../cassandra/tcm/sequences/BootstrapAndJoin.java | 4 +- .../tcm/sequences/BootstrapAndReplace.java | 5 +- .../org/apache/cassandra/tcm/sequences/Move.java | 5 +- .../cms/PrepareCMSReconfiguration.java | 27 +++++ .../apache/cassandra/tools/nodetool/CMSAdmin.java | 27 +++-- .../test/tcm/CMSPlacementAfterBootstrapTest.java | 62 ++++++++++++ .../test/tcm/CMSPlacementAfterMoveTest.java | 56 +++++++++++ .../test/tcm/CMSPlacementAfterReplacementTest.java | 110 +++++++++++++++++++++ 12 files changed, 324 insertions(+), 21 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e33d453ade..e37528d201 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Reconfigure CMS after replacement, bootstrap and move operations (CASSANDRA-19705) * Support querying LocalStrategy tables with any partitioner (CASSANDRA-19692) * Relax slow_query_log_timeout for MultiNodeSAITest (CASSANDRA-19693) * Audit Log entries are missing identity for mTLS connections (CASSANDRA-19669) diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java b/src/java/org/apache/cassandra/metrics/TCMMetrics.java index 134a1a34e2..29858ead80 100644 --- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java @@ -30,6 +30,7 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; +import static org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration; public class TCMMetrics { @@ -42,6 +43,7 @@ public class TCMMetrics public final Gauge<Long> currentCMSSize; public final Gauge<Long> unreachableCMSMembers; public final Gauge<Integer> isCMSMember; + public final Gauge<Integer> needsCMSReconfiguration; public final Histogram fetchedPeerLogEntries; public final Histogram fetchedCMSLogEntries; public final Timer fetchPeerLogLatency; @@ -91,6 +93,11 @@ public class TCMMetrics return metadata != null && metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort()) ? 1 : 0; }); + needsCMSReconfiguration = Metrics.register(factory.createMetricName("NeedsCMSReconfiguration"), () -> { + ClusterMetadata metadata = ClusterMetadata.currentNullable(); + return metadata != null && needsReconfiguration(metadata) ? 1 : 0; + }); + fetchedPeerLogEntries = Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false); fetchPeerLogLatency = Metrics.timer(factory.createMetricName("FetchPeerLogLatency")); fetchedCMSLogEntries = Metrics.histogram(factory.createMetricName("FetchedCMSLogEntries"), false); diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index 0d9f4915ea..2211a2bb3d 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -41,12 +41,22 @@ import org.apache.cassandra.tcm.sequences.ReconfigureCMS; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.Unregister; import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration; +import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; public class CMSOperations implements CMSOperationsMBean { public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.tcm:type=CMSOperations"; + public static final String MEMBERS = "MEMBERS"; + public static final String NEEDS_RECONFIGURATION = "NEEDS_RECONFIGURATION"; + public static final String IS_MEMBER = "IS_MEMBER"; + public static final String SERVICE_STATE = "SERVICE_STATE"; + public static final String IS_MIGRATING = "IS_MIGRATING"; + public static final String EPOCH = "EPOCH"; + public static final String LOCAL_PENDING = "LOCAL_PENDING"; + public static final String COMMITS_PAUSED = "COMMITS_PAUSED"; + public static final String REPLICATION_FACTOR = "REPLICATION_FACTOR"; private static final Logger logger = LoggerFactory.getLogger(ClusterMetadataService.class); public static CMSOperations instance = new CMSOperations(ClusterMetadataService.instance()); @@ -131,14 +141,15 @@ public class CMSOperations implements CMSOperationsMBean Map<String, String> info = new HashMap<>(); ClusterMetadata metadata = ClusterMetadata.current(); String members = metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(",")); - info.put("MEMBERS", members); - info.put("IS_MEMBER", Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort()))); - info.put("SERVICE_STATE", ClusterMetadataService.state(metadata).toString()); - info.put("IS_MIGRATING", Boolean.toString(cms.isMigrating())); - info.put("EPOCH", Long.toString(metadata.epoch.getEpoch())); - info.put("LOCAL_PENDING", Integer.toString(cms.log().pendingBufferSize())); - info.put("COMMITS_PAUSED", Boolean.toString(cms.commitsPaused())); - info.put("REPLICATION_FACTOR", ReplicationParams.meta(metadata).toString()); + info.put(MEMBERS, members); + info.put(NEEDS_RECONFIGURATION, Boolean.toString(PrepareCMSReconfiguration.needsReconfiguration(metadata))); + info.put(IS_MEMBER, Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort()))); + info.put(SERVICE_STATE, ClusterMetadataService.state(metadata).toString()); + info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating())); + info.put(EPOCH, Long.toString(metadata.epoch.getEpoch())); + info.put(LOCAL_PENDING, Integer.toString(cms.log().pendingBufferSize())); + info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused())); + info.put(REPLICATION_FACTOR, ReplicationParams.meta(metadata).toString()); return info; } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 7b022b04eb..43fbe8c6e9 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -362,7 +362,6 @@ public class ClusterMetadataService } } - // This method is to be used _only_ for interactive purposes (i.e. nodetool), since it assumes no retries are going to be attempted on reject. public void reconfigureCMS(ReplicationParams replicationParams) { Transformation transformation = new PrepareCMSReconfiguration.Complex(replicationParams); @@ -373,6 +372,19 @@ public class ClusterMetadataService InProgressSequences.finishInProgressSequences(ReconfigureCMS.SequenceKey.instance); } + public void ensureCMSPlacement(ClusterMetadata metadata) + { + try + { + reconfigureCMS(ReplicationParams.meta(metadata)); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Could not reconfigure CMS, operator should run `nodetool cms reconfigure` to make sure CMS placement is correct", t); + } + } + public boolean applyFromGossip(ClusterMetadata expected, ClusterMetadata updated) { logger.debug("Applying from gossip, current={} new={}", expected, updated); diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index 008fc659a9..7d316e1ea1 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -261,10 +261,11 @@ public class BootstrapAndJoin extends MultiStepOperation<Epoch> break; case FINISH_JOIN: + ClusterMetadata metadata; try { SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - ClusterMetadataService.instance().commit(finishJoin); + metadata = ClusterMetadataService.instance().commit(finishJoin); StorageService.instance.clearTransientMode(); } catch (Throwable e) @@ -273,6 +274,7 @@ public class BootstrapAndJoin extends MultiStepOperation<Epoch> logger.warn("Exception committing finishJoin", e); return continuable(); } + ClusterMetadataService.instance().ensureCMSPlacement(metadata); break; default: return error(new IllegalStateException("Can't proceed with join from " + next)); diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java index 46f39da387..e774b77ebe 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java @@ -259,10 +259,11 @@ public class BootstrapAndReplace extends MultiStepOperation<Epoch> } break; case FINISH_REPLACE: + ClusterMetadata metadata; try { SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - ClusterMetadataService.instance().commit(finishReplace); + metadata = ClusterMetadataService.instance().commit(finishReplace); } catch (Throwable e) { @@ -270,6 +271,8 @@ public class BootstrapAndReplace extends MultiStepOperation<Epoch> logger.warn("Got exception committing finishReplace", e); return halted(); } + ClusterMetadataService.instance().ensureCMSPlacement(metadata); + break; default: return error(new IllegalStateException("Can't proceed with replacement from " + next)); diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index 4b28d7b8aa..7375aedc78 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -265,17 +265,18 @@ public class Move extends MultiStepOperation<Epoch> } break; case FINISH_MOVE: + ClusterMetadata metadata; try { SystemKeyspace.updateLocalTokens(tokens); - ClusterMetadataService.instance().commit(finishMove); + metadata = ClusterMetadataService.instance().commit(finishMove); } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); return continuable(); } - + ClusterMetadataService.instance().ensureCMSPlacement(metadata); break; default: return error(new IllegalStateException("Can't proceed with join from " + next)); diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java index 87a4c870c5..7daee231b1 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java @@ -26,6 +26,9 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -49,6 +52,8 @@ import static org.apache.cassandra.locator.MetaStrategy.entireRange; public class PrepareCMSReconfiguration { + private static final Logger logger = LoggerFactory.getLogger(PrepareCMSReconfiguration.class); + private static Transformation.Result executeInternal(ClusterMetadata prev, Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform, Diff diff) { LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch()); @@ -99,6 +104,11 @@ public class PrepareCMSReconfiguration Set<NodeId> withoutReplaced = new HashSet<>(currentCms); withoutReplaced.remove(toReplace); Set<NodeId> newCms = placementStrategy.reconfigure(withoutReplaced, prev); + if (newCms.equals(currentCms)) + { + logger.info("Proposed CMS reconfiguration resulted in no required modifications at epoch {}", prev.epoch.getEpoch()); + return Transformation.success(prev.transformer(), LockedRanges.AffectedRanges.EMPTY); + } Diff diff = diff(currentCms, newCms); return executeInternal(prev, t -> t, diff); } @@ -162,6 +172,11 @@ public class PrepareCMSReconfiguration .collect(Collectors.toSet()); Set<NodeId> newCms = placementStrategy.reconfigure(currentCms, prev); + if (newCms.equals(currentCms)) + { + logger.info("Proposed CMS reconfiguration resulted in no required modifications at epoch {}", prev.epoch.getEpoch()); + return Transformation.success(prev.transformer(), LockedRanges.AffectedRanges.EMPTY); + } Diff diff = diff(currentCms, newCms); return executeInternal(prev, @@ -217,6 +232,18 @@ public class PrepareCMSReconfiguration return new Diff(additions, removals); } + public static boolean needsReconfiguration(ClusterMetadata metadata) + { + CMSPlacementStrategy placementStrategy = CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata), nodeId -> true); + Set<NodeId> currentCms = metadata.fullCMSMembers() + .stream() + .map(metadata.directory::peerId) + .collect(Collectors.toSet()); + + Set<NodeId> newCms = placementStrategy.reconfigure(currentCms, metadata); + return !currentCms.equals(newCms); + } + public static class Diff { public static final Serializer serializer = new Serializer(); diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java index e54430a291..0ed853ba68 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java @@ -29,6 +29,16 @@ import io.airlift.airline.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool; +import static org.apache.cassandra.tcm.CMSOperations.COMMITS_PAUSED; +import static org.apache.cassandra.tcm.CMSOperations.EPOCH; +import static org.apache.cassandra.tcm.CMSOperations.IS_MEMBER; +import static org.apache.cassandra.tcm.CMSOperations.IS_MIGRATING; +import static org.apache.cassandra.tcm.CMSOperations.LOCAL_PENDING; +import static org.apache.cassandra.tcm.CMSOperations.MEMBERS; +import static org.apache.cassandra.tcm.CMSOperations.NEEDS_RECONFIGURATION; +import static org.apache.cassandra.tcm.CMSOperations.REPLICATION_FACTOR; +import static org.apache.cassandra.tcm.CMSOperations.SERVICE_STATE; + public abstract class CMSAdmin extends NodeTool.NodeToolCmd { @Command(name = "describe", description = "Describe the current Cluster Metadata Service") @@ -39,14 +49,15 @@ public abstract class CMSAdmin extends NodeTool.NodeToolCmd { Map<String, String> info = probe.getCMSOperationsProxy().describeCMS(); output.out.printf("Cluster Metadata Service:%n"); - output.out.printf("Members: %s%n", info.get("MEMBERS")); - output.out.printf("Is Member: %s%n", info.get("IS_MEMBER")); - output.out.printf("Service State: %s%n", info.get("SERVICE_STATE")); - output.out.printf("Is Migrating: %s%n", info.get("IS_MIGRATING")); - output.out.printf("Epoch: %s%n", info.get("EPOCH")); - output.out.printf("Local Pending Count: %s%n", info.get("LOCAL_PENDING")); - output.out.printf("Commits Paused: %s%n", info.get("COMMITS_PAUSED")); - output.out.printf("Replication factor: %s%n", info.get("REPLICATION_FACTOR")); + output.out.printf("Members: %s%n", info.get(MEMBERS)); + output.out.printf("Needs reconfiguration: %s%n", info.get(NEEDS_RECONFIGURATION)); + output.out.printf("Is Member: %s%n", info.get(IS_MEMBER)); + output.out.printf("Service State: %s%n", info.get(SERVICE_STATE)); + output.out.printf("Is Migrating: %s%n", info.get(IS_MIGRATING)); + output.out.printf("Epoch: %s%n", info.get(EPOCH)); + output.out.printf("Local Pending Count: %s%n", info.get(LOCAL_PENDING)); + output.out.printf("Commits Paused: %s%n", info.get(COMMITS_PAUSED)); + output.out.printf("Replication factor: %s%n", info.get(REPLICATION_FACTOR)); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterBootstrapTest.java new file mode 100644 index 0000000000..67a852ff80 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterBootstrapTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.ClusterMetadata; + +import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin; +import static org.apache.cassandra.distributed.test.tcm.CMSPlacementAfterReplacementTest.assertInCMS; + +public class CMSPlacementAfterBootstrapTest extends TestBaseImpl +{ + @Test + public void testBootstrapToCMS() throws IOException + { + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3); + try (Cluster cluster = init(Cluster.build(3) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0")) + .withTokenSupplier(node -> node == 4 ? even.token(1) + 100 : even.token(node)) + .start())) + { + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + IInstanceConfig config = cluster.newInstanceConfig() + .set("auto_bootstrap", true) + .set(Constants.KEY_DTEST_FULL_STARTUP, true); + IInvokableInstance toBootstrap = cluster.bootstrap(config); + toBootstrap.startup(cluster); + awaitRingJoin(cluster.get(1), toBootstrap); + awaitRingJoin(toBootstrap, cluster.get(1)); + int joinNodeId = cluster.get(4).callOnInstance(() -> ClusterMetadata.current().myNodeId().id()); + assertInCMS(cluster, joinNodeId); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java new file mode 100644 index 0000000000..8b66e97c41 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.ClusterMetadata; + +import static org.apache.cassandra.distributed.test.tcm.CMSPlacementAfterReplacementTest.assertInCMS; + +public class CMSPlacementAfterMoveTest extends TestBaseImpl +{ + @Test + public void testMoveToCMS() throws IOException + { + try (Cluster cluster = init(Cluster.build(4) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .start())) + { + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + long node1Token = cluster.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + ImmutableList<Token> tokens = ClusterMetadata.current().tokenMap.tokens(metadata.myNodeId()); + return ((Murmur3Partitioner.LongToken) tokens.get(0)).token; + }); + long newNode4Token = node1Token + 100; // token after node1s token should be in cms + cluster.get(4).nodetoolResult("move", String.valueOf(newNode4Token)); + int moveNodeId = cluster.get(4).callOnInstance(() -> ClusterMetadata.current().myNodeId().id()); + assertInCMS(cluster, moveNodeId); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterReplacementTest.java new file mode 100644 index 0000000000..4a78f8805a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterReplacementTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance; +import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin; +import static org.apache.cassandra.distributed.shared.ClusterUtils.startHostReplacement; +import static org.junit.Assert.assertTrue; + +public class CMSPlacementAfterReplacementTest extends TestBaseImpl +{ + @Test + public void replaceSmallerRF() throws IOException, ExecutionException, InterruptedException + { + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(4); + try (Cluster cluster = init(Cluster.build(4) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .withTokenSupplier(node -> even.token(node == 5 ? 2 : node)) + .start())) + { + replacementHelper(cluster); + } + } + + @Test + public void replaceEqualRF() throws IOException, ExecutionException, InterruptedException + { + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3); + try (Cluster cluster = init(Cluster.build(3) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .withTokenSupplier(node -> even.token(node == 4 ? 2 : node)) + .start())) + { + replacementHelper(cluster); + } + } + + /** + * 1. make the CMS contain 3 nodes + * 2. make sure node2 is in the CMS + * 3. replace node2 + * 4. make sure the replacement node appears as a member of the CMS + */ + private static void replacementHelper(Cluster cluster) throws ExecutionException, InterruptedException + { + IInvokableInstance nodeToRemove = cluster.get(2); + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + cluster.get(2).runOnInstance(() -> { + assertTrue(ClusterMetadata.current().isCMSMember(FBUtilities.getBroadcastAddressAndPort())); + }); + nodeToRemove.shutdown().get(); + IInvokableInstance replacingNode = addInstance(cluster, nodeToRemove.config(), + c -> c.set("auto_bootstrap", true) + .set("progress_barrier_min_consistency_level", ConsistencyLevel.ONE)); + startHostReplacement(nodeToRemove, replacingNode, (ignore1_, ignore2_) -> {}); + awaitRingJoin(cluster.get(1), replacingNode); + awaitRingJoin(replacingNode, cluster.get(1)); + int replacementNodeId = replacingNode.callOnInstance(() -> ClusterMetadata.current().myNodeId().id()); + assertInCMS(cluster, replacementNodeId); + } + + static void assertInCMS(Cluster cluster, int nodeId) + { + cluster.get(1).runOnInstance(() -> { + InetAddressAndPort ep = ClusterMetadata.current().directory.endpoint(new NodeId(nodeId)); + int tries = 0; + while (!ClusterMetadata.current().isCMSMember(ep)) + { + if (tries > 10) + throw new AssertionError(ep + " did not become a CMS member after " + tries + " seconds"); + tries++; + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org