This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbe07fd57e Reconfigure CMS after replacement, bootstrap and move 
operations
cbe07fd57e is described below

commit cbe07fd57e3d94a1f2512fced3f38e69ad4b3eb2
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Thu Jun 13 16:21:42 2024 +0200

    Reconfigure CMS after replacement, bootstrap and move operations
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19705
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/metrics/TCMMetrics.java   |   7 ++
 .../org/apache/cassandra/tcm/CMSOperations.java    |  27 +++--
 .../cassandra/tcm/ClusterMetadataService.java      |  14 ++-
 .../cassandra/tcm/sequences/BootstrapAndJoin.java  |   4 +-
 .../tcm/sequences/BootstrapAndReplace.java         |   5 +-
 .../org/apache/cassandra/tcm/sequences/Move.java   |   5 +-
 .../cms/PrepareCMSReconfiguration.java             |  27 +++++
 .../apache/cassandra/tools/nodetool/CMSAdmin.java  |  27 +++--
 .../test/tcm/CMSPlacementAfterBootstrapTest.java   |  62 ++++++++++++
 .../test/tcm/CMSPlacementAfterMoveTest.java        |  56 +++++++++++
 .../test/tcm/CMSPlacementAfterReplacementTest.java | 110 +++++++++++++++++++++
 12 files changed, 324 insertions(+), 21 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e33d453ade..e37528d201 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Reconfigure CMS after replacement, bootstrap and move operations 
(CASSANDRA-19705)
  * Support querying LocalStrategy tables with any partitioner (CASSANDRA-19692)
  * Relax slow_query_log_timeout for MultiNodeSAITest (CASSANDRA-19693)
  * Audit Log entries are missing identity for mTLS connections 
(CASSANDRA-19669)
diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java 
b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
index 134a1a34e2..29858ead80 100644
--- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
 
 public class TCMMetrics
 {
@@ -42,6 +43,7 @@ public class TCMMetrics
     public final Gauge<Long> currentCMSSize;
     public final Gauge<Long> unreachableCMSMembers;
     public final Gauge<Integer> isCMSMember;
+    public final Gauge<Integer> needsCMSReconfiguration;
     public final Histogram fetchedPeerLogEntries;
     public final Histogram fetchedCMSLogEntries;
     public final Timer fetchPeerLogLatency;
@@ -91,6 +93,11 @@ public class TCMMetrics
             return metadata != null && 
metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort()) ? 1 : 0;
         });
 
+        needsCMSReconfiguration = 
Metrics.register(factory.createMetricName("NeedsCMSReconfiguration"), () -> {
+            ClusterMetadata metadata =  ClusterMetadata.currentNullable();
+            return metadata != null && needsReconfiguration(metadata) ? 1 : 0;
+        });
+
         fetchedPeerLogEntries = 
Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false);
         fetchPeerLogLatency = 
Metrics.timer(factory.createMetricName("FetchPeerLogLatency"));
         fetchedCMSLogEntries = 
Metrics.histogram(factory.createMetricName("FetchedCMSLogEntries"), false);
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java 
b/src/java/org/apache/cassandra/tcm/CMSOperations.java
index 0d9f4915ea..2211a2bb3d 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperations.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java
@@ -41,12 +41,22 @@ import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.Unregister;
 import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration;
+import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
 public class CMSOperations implements CMSOperationsMBean
 {
     public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.tcm:type=CMSOperations";
+    public static final String MEMBERS = "MEMBERS";
+    public static final String NEEDS_RECONFIGURATION = "NEEDS_RECONFIGURATION";
+    public static final String IS_MEMBER = "IS_MEMBER";
+    public static final String SERVICE_STATE = "SERVICE_STATE";
+    public static final String IS_MIGRATING = "IS_MIGRATING";
+    public static final String EPOCH = "EPOCH";
+    public static final String LOCAL_PENDING = "LOCAL_PENDING";
+    public static final String COMMITS_PAUSED = "COMMITS_PAUSED";
+    public static final String REPLICATION_FACTOR = "REPLICATION_FACTOR";
 
     private static final Logger logger = 
LoggerFactory.getLogger(ClusterMetadataService.class);
     public static CMSOperations instance = new 
CMSOperations(ClusterMetadataService.instance());
@@ -131,14 +141,15 @@ public class CMSOperations implements CMSOperationsMBean
         Map<String, String> info = new HashMap<>();
         ClusterMetadata metadata = ClusterMetadata.current();
         String members = 
metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
-        info.put("MEMBERS", members);
-        info.put("IS_MEMBER", 
Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort())));
-        info.put("SERVICE_STATE", 
ClusterMetadataService.state(metadata).toString());
-        info.put("IS_MIGRATING", Boolean.toString(cms.isMigrating()));
-        info.put("EPOCH", Long.toString(metadata.epoch.getEpoch()));
-        info.put("LOCAL_PENDING", 
Integer.toString(cms.log().pendingBufferSize()));
-        info.put("COMMITS_PAUSED", Boolean.toString(cms.commitsPaused()));
-        info.put("REPLICATION_FACTOR", 
ReplicationParams.meta(metadata).toString());
+        info.put(MEMBERS, members);
+        info.put(NEEDS_RECONFIGURATION, 
Boolean.toString(PrepareCMSReconfiguration.needsReconfiguration(metadata)));
+        info.put(IS_MEMBER, 
Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort())));
+        info.put(SERVICE_STATE, 
ClusterMetadataService.state(metadata).toString());
+        info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating()));
+        info.put(EPOCH, Long.toString(metadata.epoch.getEpoch()));
+        info.put(LOCAL_PENDING, 
Integer.toString(cms.log().pendingBufferSize()));
+        info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused()));
+        info.put(REPLICATION_FACTOR, 
ReplicationParams.meta(metadata).toString());
         return info;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 7b022b04eb..43fbe8c6e9 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -362,7 +362,6 @@ public class ClusterMetadataService
         }
     }
 
-    // This method is to be used _only_ for interactive purposes (i.e. 
nodetool), since it assumes no retries are going to be attempted on reject.
     public void reconfigureCMS(ReplicationParams replicationParams)
     {
         Transformation transformation = new 
PrepareCMSReconfiguration.Complex(replicationParams);
@@ -373,6 +372,19 @@ public class ClusterMetadataService
         
InProgressSequences.finishInProgressSequences(ReconfigureCMS.SequenceKey.instance);
     }
 
+    public void ensureCMSPlacement(ClusterMetadata metadata)
+    {
+        try
+        {
+            reconfigureCMS(ReplicationParams.meta(metadata));
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.warn("Could not reconfigure CMS, operator should run 
`nodetool cms reconfigure` to make sure CMS placement is correct", t);
+        }
+    }
+
     public boolean applyFromGossip(ClusterMetadata expected, ClusterMetadata 
updated)
     {
         logger.debug("Applying from gossip, current={} new={}", expected, 
updated);
diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java 
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
index 008fc659a9..7d316e1ea1 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
@@ -261,10 +261,11 @@ public class BootstrapAndJoin extends 
MultiStepOperation<Epoch>
 
                 break;
             case FINISH_JOIN:
+                ClusterMetadata metadata;
                 try
                 {
                     
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
-                    ClusterMetadataService.instance().commit(finishJoin);
+                    metadata = 
ClusterMetadataService.instance().commit(finishJoin);
                     StorageService.instance.clearTransientMode();
                 }
                 catch (Throwable e)
@@ -273,6 +274,7 @@ public class BootstrapAndJoin extends 
MultiStepOperation<Epoch>
                     logger.warn("Exception committing finishJoin", e);
                     return continuable();
                 }
+                ClusterMetadataService.instance().ensureCMSPlacement(metadata);
                 break;
             default:
                 return error(new IllegalStateException("Can't proceed with 
join from " + next));
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java 
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
index 46f39da387..e774b77ebe 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
@@ -259,10 +259,11 @@ public class BootstrapAndReplace extends 
MultiStepOperation<Epoch>
                 }
                 break;
             case FINISH_REPLACE:
+                ClusterMetadata metadata;
                 try
                 {
                     
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
-                    ClusterMetadataService.instance().commit(finishReplace);
+                    metadata = 
ClusterMetadataService.instance().commit(finishReplace);
                 }
                 catch (Throwable e)
                 {
@@ -270,6 +271,8 @@ public class BootstrapAndReplace extends 
MultiStepOperation<Epoch>
                     logger.warn("Got exception committing finishReplace", e);
                     return halted();
                 }
+                ClusterMetadataService.instance().ensureCMSPlacement(metadata);
+
                 break;
             default:
                 return error(new IllegalStateException("Can't proceed with 
replacement from " + next));
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java 
b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index 4b28d7b8aa..7375aedc78 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -265,17 +265,18 @@ public class Move extends MultiStepOperation<Epoch>
                 }
                 break;
             case FINISH_MOVE:
+                ClusterMetadata metadata;
                 try
                 {
                     SystemKeyspace.updateLocalTokens(tokens);
-                    ClusterMetadataService.instance().commit(finishMove);
+                    metadata = 
ClusterMetadataService.instance().commit(finishMove);
                 }
                 catch (Throwable t)
                 {
                     JVMStabilityInspector.inspectThrowable(t);
                     return continuable();
                 }
-
+                ClusterMetadataService.instance().ensureCMSPlacement(metadata);
                 break;
             default:
                 return error(new IllegalStateException("Can't proceed with 
join from " + next));
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
 
b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
index 87a4c870c5..7daee231b1 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
@@ -26,6 +26,9 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -49,6 +52,8 @@ import static 
org.apache.cassandra.locator.MetaStrategy.entireRange;
 
 public class PrepareCMSReconfiguration
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(PrepareCMSReconfiguration.class);
+
     private static Transformation.Result executeInternal(ClusterMetadata prev, 
Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform, 
Diff diff)
     {
         LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch());
@@ -99,6 +104,11 @@ public class PrepareCMSReconfiguration
             Set<NodeId> withoutReplaced = new HashSet<>(currentCms);
             withoutReplaced.remove(toReplace);
             Set<NodeId> newCms = 
placementStrategy.reconfigure(withoutReplaced, prev);
+            if (newCms.equals(currentCms))
+            {
+                logger.info("Proposed CMS reconfiguration resulted in no 
required modifications at epoch {}", prev.epoch.getEpoch());
+                return Transformation.success(prev.transformer(), 
LockedRanges.AffectedRanges.EMPTY);
+            }
             Diff diff = diff(currentCms, newCms);
             return executeInternal(prev, t -> t, diff);
         }
@@ -162,6 +172,11 @@ public class PrepareCMSReconfiguration
                                          .collect(Collectors.toSet());
 
             Set<NodeId> newCms = placementStrategy.reconfigure(currentCms, 
prev);
+            if (newCms.equals(currentCms))
+            {
+                logger.info("Proposed CMS reconfiguration resulted in no 
required modifications at epoch {}", prev.epoch.getEpoch());
+                return Transformation.success(prev.transformer(), 
LockedRanges.AffectedRanges.EMPTY);
+            }
             Diff diff = diff(currentCms, newCms);
 
             return executeInternal(prev,
@@ -217,6 +232,18 @@ public class PrepareCMSReconfiguration
         return new Diff(additions, removals);
     }
 
+    public static boolean needsReconfiguration(ClusterMetadata metadata)
+    {
+        CMSPlacementStrategy placementStrategy = 
CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata), 
nodeId -> true);
+        Set<NodeId> currentCms = metadata.fullCMSMembers()
+                                         .stream()
+                                         .map(metadata.directory::peerId)
+                                         .collect(Collectors.toSet());
+
+        Set<NodeId> newCms = placementStrategy.reconfigure(currentCms, 
metadata);
+        return !currentCms.equals(newCms);
+    }
+
     public static class Diff
     {
         public static final Serializer serializer = new Serializer();
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
index e54430a291..0ed853ba68 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
@@ -29,6 +29,16 @@ import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool;
 
+import static org.apache.cassandra.tcm.CMSOperations.COMMITS_PAUSED;
+import static org.apache.cassandra.tcm.CMSOperations.EPOCH;
+import static org.apache.cassandra.tcm.CMSOperations.IS_MEMBER;
+import static org.apache.cassandra.tcm.CMSOperations.IS_MIGRATING;
+import static org.apache.cassandra.tcm.CMSOperations.LOCAL_PENDING;
+import static org.apache.cassandra.tcm.CMSOperations.MEMBERS;
+import static org.apache.cassandra.tcm.CMSOperations.NEEDS_RECONFIGURATION;
+import static org.apache.cassandra.tcm.CMSOperations.REPLICATION_FACTOR;
+import static org.apache.cassandra.tcm.CMSOperations.SERVICE_STATE;
+
 public abstract class CMSAdmin extends NodeTool.NodeToolCmd
 {
     @Command(name = "describe", description = "Describe the current Cluster 
Metadata Service")
@@ -39,14 +49,15 @@ public abstract class CMSAdmin extends NodeTool.NodeToolCmd
         {
             Map<String, String> info = 
probe.getCMSOperationsProxy().describeCMS();
             output.out.printf("Cluster Metadata Service:%n");
-            output.out.printf("Members: %s%n", info.get("MEMBERS"));
-            output.out.printf("Is Member: %s%n", info.get("IS_MEMBER"));
-            output.out.printf("Service State: %s%n", 
info.get("SERVICE_STATE"));
-            output.out.printf("Is Migrating: %s%n", info.get("IS_MIGRATING"));
-            output.out.printf("Epoch: %s%n", info.get("EPOCH"));
-            output.out.printf("Local Pending Count: %s%n", 
info.get("LOCAL_PENDING"));
-            output.out.printf("Commits Paused: %s%n", 
info.get("COMMITS_PAUSED"));
-            output.out.printf("Replication factor: %s%n", 
info.get("REPLICATION_FACTOR"));
+            output.out.printf("Members: %s%n", info.get(MEMBERS));
+            output.out.printf("Needs reconfiguration: %s%n", 
info.get(NEEDS_RECONFIGURATION));
+            output.out.printf("Is Member: %s%n", info.get(IS_MEMBER));
+            output.out.printf("Service State: %s%n", info.get(SERVICE_STATE));
+            output.out.printf("Is Migrating: %s%n", info.get(IS_MIGRATING));
+            output.out.printf("Epoch: %s%n", info.get(EPOCH));
+            output.out.printf("Local Pending Count: %s%n", 
info.get(LOCAL_PENDING));
+            output.out.printf("Commits Paused: %s%n", 
info.get(COMMITS_PAUSED));
+            output.out.printf("Replication factor: %s%n", 
info.get(REPLICATION_FACTOR));
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterBootstrapTest.java
new file mode 100644
index 0000000000..67a852ff80
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterBootstrapTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.test.tcm.CMSPlacementAfterReplacementTest.assertInCMS;
+
+public class CMSPlacementAfterBootstrapTest extends TestBaseImpl
+{
+    @Test
+    public void testBootstrapToCMS() throws IOException
+    {
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
+                                           .withTokenSupplier(node -> node == 
4 ? even.token(1) + 100 : even.token(node))
+                                           .start()))
+        {
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            IInstanceConfig config = cluster.newInstanceConfig()
+                                            .set("auto_bootstrap", true)
+                                            
.set(Constants.KEY_DTEST_FULL_STARTUP, true);
+            IInvokableInstance toBootstrap = cluster.bootstrap(config);
+            toBootstrap.startup(cluster);
+            awaitRingJoin(cluster.get(1), toBootstrap);
+            awaitRingJoin(toBootstrap, cluster.get(1));
+            int joinNodeId = cluster.get(4).callOnInstance(() -> 
ClusterMetadata.current().myNodeId().id());
+            assertInCMS(cluster, joinNodeId);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
new file mode 100644
index 0000000000..8b66e97c41
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static 
org.apache.cassandra.distributed.test.tcm.CMSPlacementAfterReplacementTest.assertInCMS;
+
+public class CMSPlacementAfterMoveTest extends TestBaseImpl
+{
+    @Test
+    public void testMoveToCMS() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(4)
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           .start()))
+        {
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            long node1Token = cluster.get(1).callOnInstance(() -> {
+                ClusterMetadata metadata = ClusterMetadata.current();
+                ImmutableList<Token> tokens = 
ClusterMetadata.current().tokenMap.tokens(metadata.myNodeId());
+                return ((Murmur3Partitioner.LongToken) tokens.get(0)).token;
+            });
+            long newNode4Token = node1Token + 100; // token after node1s token 
should be in cms
+            cluster.get(4).nodetoolResult("move", 
String.valueOf(newNode4Token));
+            int moveNodeId = cluster.get(4).callOnInstance(() -> 
ClusterMetadata.current().myNodeId().id());
+            assertInCMS(cluster, moveNodeId);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterReplacementTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterReplacementTest.java
new file mode 100644
index 0000000000..4a78f8805a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterReplacementTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.startHostReplacement;
+import static org.junit.Assert.assertTrue;
+
+public class CMSPlacementAfterReplacementTest extends TestBaseImpl
+{
+    @Test
+    public void replaceSmallerRF() throws IOException, ExecutionException, 
InterruptedException
+    {
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(4);
+        try (Cluster cluster = init(Cluster.build(4)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == 5 ? 2 : node))
+                                      .start()))
+        {
+            replacementHelper(cluster);
+        }
+    }
+
+    @Test
+    public void replaceEqualRF() throws IOException, ExecutionException, 
InterruptedException
+    {
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           .withTokenSupplier(node -> 
even.token(node == 4 ? 2 : node))
+                                           .start()))
+        {
+            replacementHelper(cluster);
+        }
+    }
+
+    /**
+     * 1. make the CMS contain 3 nodes
+     * 2. make sure node2 is in the CMS
+     * 3. replace node2
+     * 4. make sure the replacement node appears as a member of the CMS
+     */
+    private static void replacementHelper(Cluster cluster) throws 
ExecutionException, InterruptedException
+    {
+        IInvokableInstance nodeToRemove = cluster.get(2);
+        cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+        cluster.get(2).runOnInstance(() -> {
+            
assertTrue(ClusterMetadata.current().isCMSMember(FBUtilities.getBroadcastAddressAndPort()));
+        });
+        nodeToRemove.shutdown().get();
+        IInvokableInstance replacingNode = addInstance(cluster, 
nodeToRemove.config(),
+                                                       c -> 
c.set("auto_bootstrap", true)
+                                                             
.set("progress_barrier_min_consistency_level", ConsistencyLevel.ONE));
+        startHostReplacement(nodeToRemove, replacingNode, (ignore1_, ignore2_) 
-> {});
+        awaitRingJoin(cluster.get(1), replacingNode);
+        awaitRingJoin(replacingNode, cluster.get(1));
+        int replacementNodeId = replacingNode.callOnInstance(() -> 
ClusterMetadata.current().myNodeId().id());
+        assertInCMS(cluster, replacementNodeId);
+    }
+
+    static void assertInCMS(Cluster cluster, int nodeId)
+    {
+        cluster.get(1).runOnInstance(() -> {
+            InetAddressAndPort ep = 
ClusterMetadata.current().directory.endpoint(new NodeId(nodeId));
+            int tries = 0;
+            while (!ClusterMetadata.current().isCMSMember(ep))
+            {
+                if (tries > 10)
+                    throw new AssertionError(ep + " did not become a CMS 
member after " + tries + " seconds");
+                tries++;
+                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            }
+        });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to