This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 279c0527aa Allow CMS reconfiguration to work around DOWN nodes
279c0527aa is described below

commit 279c0527aa3d52e1474fee5f37c0227ed6f9da5f
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Thu Sep 26 15:56:03 2024 +0200

    Allow CMS reconfiguration to work around DOWN nodes
    
    Patch by Sam Tunnicliffe and marcuse; reviewed by Sam Tunnicliffe for 
CASSANDRA-19943
    
    Co-authored-by: Sam Tunnicliffe <s...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../config/CassandraRelevantProperties.java        |   4 +
 .../cassandra/locator/CMSPlacementStrategy.java    | 138 ++++--------
 .../org/apache/cassandra/locator/MetaStrategy.java |  25 ++-
 .../org/apache/cassandra/metrics/TCMMetrics.java   |   8 +-
 .../streaming/DataMovementVerbHandler.java         |   4 +-
 .../org/apache/cassandra/tcm/CMSOperations.java    |   6 +-
 .../cassandra/tcm/ClusterMetadataService.java      |  18 +-
 .../cassandra/tcm/sequences/ReconfigureCMS.java    |   9 +-
 .../cassandra/tcm/serialization/Version.java       |   1 +
 .../cms/PrepareCMSReconfiguration.java             | 243 ++++++++++++++++-----
 .../apache/cassandra/distributed/test/CASTest.java |  13 +-
 .../test/log/ClusterMetadataTestHelper.java        |   2 +-
 .../test/log/MetadataChangeSimulationTest.java     |   2 +-
 .../test/log/ReconfigureCMSStreamingTest.java      |  78 +++++++
 .../distributed/test/log/ReconfigureCMSTest.java   | 145 +++++++++++-
 .../apache/cassandra/locator/MetaStrategyTest.java |   6 +-
 17 files changed, 517 insertions(+), 186 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 252b9c919c..a69a7222ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Allow CMS reconfiguration to work around DOWN nodes (CASSANDRA-19943)
  * Make TableParams.Serializer set allowAutoSnapshots and incrementalBackups 
(CASSANDRA-19954)
  * Make sstabledump possible to show tombstones only (CASSANDRA-19939)
  * Ensure that RFP queries potentially stale replicas even with only key 
columns in the row filter (CASSANDRA-19938)
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index a0baf8de90..56a8aad315 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -533,6 +533,10 @@ public enum CassandraRelevantProperties
      */
     
TCM_RECENTLY_SEALED_PERIOD_INDEX_SIZE("cassandra.recently_sealed_period_index_size",
 "10"),
 
+    /**
+     * for testing purposes disable the automatic CMS reconfiguration after a 
bootstrap/replace/move operation
+     */
+    
TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE("cassandra.test.skip_cms_reconfig_after_topology_change",
 "false"),
     /**
      * should replica groups in data placements be sorted to ensure the 
primary replica is first in the list
      */
diff --git a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java 
b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
index 6ba47ae04d..716b40a34c 100644
--- a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
+++ b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.membership.Directory;
@@ -39,123 +38,70 @@ import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.ownership.TokenMap;
 
-import static org.apache.cassandra.locator.SimpleStrategy.REPLICATION_FACTOR;
-
 /**
  * CMS Placement Strategy is how CMS keeps the number of its members at a 
configured level, given current
  * cluster topolgy. It allows to add and remove CMS members when cluster 
topology changes. For example, during
- * node replacement or decommission.
+ * node replacement or decommission. This attempts to achieve rack diversity, 
while keeping CMS placements
+ * close to how "regular" data would get replicated to keep the bounces safe, 
as long as the user
+ * bounces at most `f` members of the replica group, where `f = (RF - 1)/2`.
  */
-public interface CMSPlacementStrategy
+public class CMSPlacementStrategy
 {
-    Set<NodeId> reconfigure(ClusterMetadata metadata);
-    boolean needsReconfiguration(ClusterMetadata metadata);
+    public final Map<String, Integer> rf;
+    public final BiFunction<ClusterMetadata, NodeId, Boolean> filter;
 
-    static CMSPlacementStrategy fromReplicationParams(ReplicationParams 
params, Predicate<NodeId> filter)
+    public CMSPlacementStrategy(Map<String, Integer> rf, Predicate<NodeId> 
filter)
     {
-        if (params.isMeta())
-        {
-            assert !params.options.containsKey(REPLICATION_FACTOR);
-            Map<String, Integer> dcRf = new HashMap<>();
-            for (Map.Entry<String, String> entry : params.options.entrySet())
-            {
-                String dc = entry.getKey();
-                ReplicationFactor rf = 
ReplicationFactor.fromString(entry.getValue());
-                dcRf.put(dc, rf.fullReplicas);
-            }
-            return new DatacenterAware(dcRf, filter);
-        }
-        else
-        {
-            throw new IllegalStateException("Can't parse the params: " + 
params);
-        }
+        this(rf, new DefaultNodeFilter(filter));
     }
 
-    /**
-     * Default reconfiguration strategy: attempts to achieve rack diversity, 
while keeping CMS placements
-     * close to how "regular" data would get replicated to keep the bounces 
safe, as long as the user
-     * bounces at most `f` members of the replica group, where `f = RF/2`.
-     */
-    class DatacenterAware implements CMSPlacementStrategy
+    @VisibleForTesting
+    public CMSPlacementStrategy(Map<String, Integer> rf, 
BiFunction<ClusterMetadata, NodeId, Boolean> filter)
     {
-        public final Map<String, Integer> rf;
-        public final BiFunction<ClusterMetadata, NodeId, Boolean> filter;
-
-        public DatacenterAware(Map<String, Integer> rf, Predicate<NodeId> 
filter)
-        {
-            this(rf, new DefaultNodeFilter(filter));
-        }
-
-        @VisibleForTesting
-        public DatacenterAware(Map<String, Integer> rf,  
BiFunction<ClusterMetadata, NodeId, Boolean> filter)
-        {
-            this.rf = rf;
-            this.filter = filter;
-        }
+        // todo: verify only test uses with other filter
+        this.rf = rf;
+        this.filter = filter;
+    }
 
-        public Set<NodeId> reconfigure(ClusterMetadata metadata)
+    public Set<NodeId> reconfigure(ClusterMetadata metadata)
+    {
+        Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size());
+        for (Map.Entry<String, Integer> e : this.rf.entrySet())
         {
-            Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size());
-            for (Map.Entry<String, Integer> e : this.rf.entrySet())
-            {
-                Collection<InetAddressAndPort> nodesInDc = 
metadata.directory.allDatacenterEndpoints().get(e.getKey());
-                if (nodesInDc.isEmpty())
-                    throw new IllegalStateException(String.format("There are 
no nodes in %s datacenter", e.getKey()));
-                if (nodesInDc.size() < e.getValue())
-                    throw new 
Transformation.RejectedTransformationException(String.format("There are not 
enough nodes in %s datacenter to satisfy replication factor", e.getKey()));
+            Collection<InetAddressAndPort> nodesInDc = 
metadata.directory.allDatacenterEndpoints().get(e.getKey());
+            if (nodesInDc.isEmpty())
+                throw new IllegalStateException(String.format("There are no 
nodes in %s datacenter", e.getKey()));
+            if (nodesInDc.size() < e.getValue())
+                throw new 
Transformation.RejectedTransformationException(String.format("There are not 
enough nodes in %s datacenter to satisfy replication factor", e.getKey()));
 
-                rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue()));
-            }
-            return reconfigure(metadata, rf);
+            rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue()));
         }
 
-        public Set<NodeId> reconfigure(ClusterMetadata metadata, Map<String, 
ReplicationFactor> rf)
+        Directory tmpDirectory = metadata.directory;
+        TokenMap tmpTokenMap = metadata.tokenMap;
+        for (NodeId peerId : metadata.directory.peerIds())
         {
-            Directory tmpDirectory = metadata.directory;
-            TokenMap tokenMap = metadata.tokenMap;
-            for (NodeId peerId : metadata.directory.peerIds())
+            if (!filter.apply(metadata, peerId))
             {
-                if (!filter.apply(metadata, peerId))
-                {
-                    tmpDirectory = tmpDirectory.without(peerId);
-                    tokenMap = tokenMap.unassignTokens(peerId);
-                }
+                tmpDirectory = tmpDirectory.without(peerId);
+                tmpTokenMap = tmpTokenMap.unassignTokens(peerId);
             }
-
-            // Although MetaStrategy has its own entireRange, it uses a custom 
partitioner which isn't compatible with
-            // regular, non-CMS placements. For that reason, we select 
replicas here using tokens provided by the
-            // globally configured partitioner.
-            Token minToken = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
-            EndpointsForRange endpoints = 
NetworkTopologyStrategy.calculateNaturalReplicas(minToken,
-                                                                               
            new Range<>(minToken, minToken),
-                                                                               
            tmpDirectory,
-                                                                               
            tokenMap,
-                                                                               
            rf);
-
-            return 
endpoints.endpoints().stream().map(metadata.directory::peerId).collect(Collectors.toSet());
         }
 
-        public boolean needsReconfiguration(ClusterMetadata metadata)
-        {
-            Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size());
-            for (Map.Entry<String, Integer> e : this.rf.entrySet())
-            {
-                Collection<InetAddressAndPort> nodesInDc = 
metadata.directory.allDatacenterEndpoints().get(e.getKey());
-                if (nodesInDc.size() < e.getValue())
-                    return true;
-                rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue()));
-            }
-
-            Set<NodeId> currentCms = metadata.fullCMSMembers()
-                                             .stream()
-                                             .map(metadata.directory::peerId)
-                                             .collect(Collectors.toSet());
-            Set<NodeId> newCms = reconfigure(metadata, rf);
-            return !currentCms.equals(newCms);
-        }
+        // Although MetaStrategy has its own entireRange, it uses a custom 
partitioner which isn't compatible with
+        // regular, non-CMS placements. For that reason, we select replicas 
here using tokens provided by the
+        // globally configured partitioner.
+        Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        EndpointsForRange endpoints = 
NetworkTopologyStrategy.calculateNaturalReplicas(minToken,
+                                                                               
        new Range<>(minToken, minToken),
+                                                                               
        tmpDirectory,
+                                                                               
        tmpTokenMap,
+                                                                               
        rf);
+
+        return 
endpoints.endpoints().stream().map(metadata.directory::peerId).collect(Collectors.toSet());
     }
 
-    class DefaultNodeFilter implements BiFunction<ClusterMetadata, NodeId, 
Boolean>
+    static class DefaultNodeFilter implements BiFunction<ClusterMetadata, 
NodeId, Boolean>
     {
         private final Predicate<NodeId> filter;
 
diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java 
b/src/java/org/apache/cassandra/locator/MetaStrategy.java
index 536ac2fa22..a7afa4898c 100644
--- a/src/java/org/apache/cassandra/locator/MetaStrategy.java
+++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.tcm.sequences.LockedRanges;
 
+import static 
org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR;
+
 /**
  * MetaStrategy is designed for distributed cluster metadata keyspace, and 
should not be used by
  * the users directly. This strategy allows a configurable number of nodes to 
own an entire range and
@@ -55,9 +57,26 @@ public class MetaStrategy extends SystemStrategy
         return new Replica(addr, entireRange, true);
     }
 
+    private final ReplicationFactor rf;
+
     public MetaStrategy(String keyspaceName, Map<String, String> configOptions)
     {
         super(keyspaceName, configOptions);
+        int replicas = 0;
+        if (configOptions != null)
+        {
+            for (Map.Entry<String, String> entry : configOptions.entrySet())
+            {
+                String dc = entry.getKey();
+                // prepareOptions should have transformed any 
"replication_factor" options by now
+                if (dc.equalsIgnoreCase(REPLICATION_FACTOR))
+                    continue;
+                ReplicationFactor rf = 
ReplicationFactor.fromString(entry.getValue());
+                replicas += rf.allReplicas;
+            }
+        }
+
+        rf = ReplicationFactor.fullOnly(replicas);
     }
 
     @Override
@@ -75,11 +94,7 @@ public class MetaStrategy extends SystemStrategy
     @Override
     public ReplicationFactor getReplicationFactor()
     {
-        ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata == null || metadata.epoch.isEqualOrBefore(Epoch.FIRST))
-            return ReplicationFactor.fullOnly(1);
-        int rf = 
metadata.placements.get(ReplicationParams.meta(metadata)).writes.forRange(entireRange).get().byEndpoint.size();
-        return ReplicationFactor.fullOnly(rf);
+        return rf;
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java 
b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
index e58a3b11cd..29858ead80 100644
--- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
@@ -25,13 +25,12 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.CMSPlacementStrategy;
-import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
 
 public class TCMMetrics
 {
@@ -96,10 +95,7 @@ public class TCMMetrics
 
         needsCMSReconfiguration = 
Metrics.register(factory.createMetricName("NeedsCMSReconfiguration"), () -> {
             ClusterMetadata metadata =  ClusterMetadata.currentNullable();
-            if (metadata == null)
-                return 0;
-            CMSPlacementStrategy placementStrategy = 
CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata), 
nodeId -> true);
-            return placementStrategy.needsReconfiguration(metadata) ? 1 : 0;
+            return metadata != null && needsReconfiguration(metadata) ? 1 : 0;
         });
 
         fetchedPeerLogEntries = 
Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false);
diff --git 
a/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java 
b/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
index 2b1d791ac7..9415f89285 100644
--- a/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.tcm.ClusterMetadata;
 
 public class DataMovementVerbHandler implements IVerbHandler<DataMovement>
 {
@@ -47,8 +48,9 @@ public class DataMovementVerbHandler implements 
IVerbHandler<DataMovement>
     {
         MessagingService.instance().respond(NoPayload.noPayload, message); // 
let coordinator know we received the message
         StreamPlan streamPlan = new 
StreamPlan(StreamOperation.fromString(message.payload.streamOperation));
+        ClusterMetadata metadata = ClusterMetadata.current();
         Schema.instance.getNonLocalStrategyKeyspaces().stream().forEach((ksm) 
-> {
-            if (ksm.replicationStrategy.getReplicationFactor().allReplicas <= 
1)
+            if 
(metadata.placements.get(ksm.params.replication).writes.byEndpoint().keySet().size()
 <= 1)
                 return;
 
             
message.payload.movements.get(ksm.params.replication).asMap().forEach((local, 
endpoints) -> {
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java 
b/src/java/org/apache/cassandra/tcm/CMSOperations.java
index 30ee48b20d..87e6d0e372 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperations.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.CMSPlacementStrategy;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
@@ -45,6 +44,8 @@ import 
org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
 public class CMSOperations implements CMSOperationsMBean
 {
     public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.tcm:type=CMSOperations";
@@ -142,8 +143,7 @@ public class CMSOperations implements CMSOperationsMBean
         ClusterMetadata metadata = ClusterMetadata.current();
         String members = 
metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
         info.put(MEMBERS, members);
-        CMSPlacementStrategy placementStrategy = 
CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata), 
nodeId -> true);
-        info.put(NEEDS_RECONFIGURATION, 
Boolean.toString(placementStrategy.needsReconfiguration(metadata)));
+        info.put(NEEDS_RECONFIGURATION, 
Boolean.toString(needsReconfiguration(metadata)));
         info.put(IS_MEMBER, 
Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort())));
         info.put(SERVICE_STATE, 
ClusterMetadataService.state(metadata).toString());
         info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating()));
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 236e82392a..845b6741f9 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,6 +42,7 @@ import 
org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ExceptionCode;
 import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.util.FileInputStreamPlus;
 import org.apache.cassandra.io.util.FileOutputStreamPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -74,6 +76,7 @@ import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE;
 import static org.apache.cassandra.tcm.ClusterMetadataService.State.GOSSIP;
 import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
 import static org.apache.cassandra.tcm.ClusterMetadataService.State.REMOTE;
@@ -364,7 +367,13 @@ public class ClusterMetadataService
 
     public void reconfigureCMS(ReplicationParams replicationParams)
     {
-        Transformation transformation = new 
PrepareCMSReconfiguration.Complex(replicationParams);
+        ClusterMetadata metadata = ClusterMetadata.current();
+        Set<NodeId> downNodes = new HashSet<>();
+        for (InetAddressAndPort ep : metadata.directory.allJoinedEndpoints())
+            if (!FailureDetector.instance.isAlive(ep))
+                downNodes.add(metadata.directory.peerId(ep));
+        PrepareCMSReconfiguration.Complex transformation = new 
PrepareCMSReconfiguration.Complex(replicationParams, downNodes);
+        transformation.verify(metadata);
 
         ClusterMetadataService.instance()
                               .commit(transformation);
@@ -374,6 +383,13 @@ public class ClusterMetadataService
 
     public void ensureCMSPlacement(ClusterMetadata metadata)
     {
+        if (TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getBoolean())
+        {
+            logger.info("Not performing CMS reconfiguration as {} property is 
set. This should only be used for testing.",
+                        
TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getKey());
+            return;
+        }
+
         try
         {
             reconfigureCMS(ReplicationParams.meta(metadata));
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java 
b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
index 1a96833ffb..fb6d9998c4 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.sequences;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -205,9 +206,15 @@ public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration
     {
         if (!metadata.fullCMSMembers().contains(toRemove))
             return;
+        Set<NodeId> downNodes = new HashSet<>();
+        for (InetAddressAndPort ep : metadata.directory.allJoinedEndpoints())
+            if (!FailureDetector.instance.isAlive(ep))
+                downNodes.add(metadata.directory.peerId(ep));
 
+        PrepareCMSReconfiguration.Simple transformation = new 
PrepareCMSReconfiguration.Simple(metadata.directory.peerId(toRemove), 
downNodes);
+        transformation.verify(metadata);
         // We can force removal from the CMS as it doesn't alter the size of 
the service
-        ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Simple(metadata.directory.peerId(toRemove)));
+        ClusterMetadataService.instance().commit(transformation);
 
         InProgressSequences.finishInProgressSequences(SequenceKey.instance);
         if (ClusterMetadata.current().isCMSMember(toRemove))
diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java 
b/src/java/org/apache/cassandra/tcm/serialization/Version.java
index 7960c963ed..108348a504 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/Version.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java
@@ -40,6 +40,7 @@ public enum Version
     V2(2),
     /**
      *  - Serialize allowAutoSnapshot and incrementalBackups when serializing 
TableParams
+     * - down nodes serialized in PrepareCMSReconfiguration
      */
     V3(3),
 
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
 
b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
index 2d41fb53a6..c8b15c4fa5 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
@@ -20,10 +20,15 @@ package org.apache.cassandra.tcm.transformations.cms;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -33,6 +38,8 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.CMSPlacementStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicationFactor;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -49,13 +56,30 @@ import org.apache.cassandra.tcm.serialization.Version;
 
 import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
 import static org.apache.cassandra.locator.MetaStrategy.entireRange;
+import static org.apache.cassandra.tcm.CMSOperations.REPLICATION_FACTOR;
 
-public class PrepareCMSReconfiguration
+public abstract class PrepareCMSReconfiguration implements Transformation
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PrepareCMSReconfiguration.class);
+    final Set<NodeId> downNodes;
 
-    private static Transformation.Result executeInternal(ClusterMetadata prev, 
Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform, 
Diff diff)
+    public PrepareCMSReconfiguration(Set<NodeId> downNodes)
     {
+        this.downNodes = downNodes;
+    }
+
+    protected abstract Predicate<NodeId> 
additionalFilteringPredicate(Set<NodeId> downNodes);
+    protected abstract ReplicationParams newReplicationParams(ClusterMetadata 
prev);
+
+    protected Transformation.Result executeInternal(ClusterMetadata prev, 
Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform)
+    {
+        Diff diff = getDiff(prev);
+        if (!diff.hasChanges())
+        {
+            logger.info("Proposed CMS reconfiguration resulted in no required 
modifications at epoch {}", prev.epoch.getEpoch());
+            return Transformation.success(prev.transformer(), 
LockedRanges.AffectedRanges.EMPTY);
+        }
+
         LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch());
         Set<NodeId> cms = 
prev.fullCMSMembers().stream().map(prev.directory::peerId).collect(Collectors.toSet());
         Set<NodeId> tmp = new HashSet<>(cms);
@@ -71,17 +95,81 @@ public class PrepareCMSReconfiguration
         return Transformation.success(transform.apply(transformer), 
LockedRanges.AffectedRanges.EMPTY);
     }
 
-    public static class Simple implements Transformation
+    private Diff getDiff(ClusterMetadata prev)
+    {
+        Set<NodeId> currentCms = prev.fullCMSMemberIds();
+        Map<String, Integer> dcRF = extractRf(newReplicationParams(prev));
+        Set<NodeId> newCms = prepareNewCMS(dcRF, prev);
+        if (newCms.equals(currentCms))
+            return Diff.NOCHANGE;
+        return diff(currentCms, newCms);
+    }
+
+    private Set<NodeId> prepareNewCMS(Map<String, Integer> dcRf, 
ClusterMetadata prev)
+    {
+        CMSPlacementStrategy placementStrategy = new 
CMSPlacementStrategy(dcRf, additionalFilteringPredicate(downNodes));
+        return placementStrategy.reconfigure(prev);
+    }
+
+    public void verify(ClusterMetadata prev)
+    {
+        Map<String, Integer> dcRf = extractRf(newReplicationParams(prev));
+        int expectedSize = 
dcRf.values().stream().mapToInt(Integer::intValue).sum();
+        Set<NodeId> newCms = prepareNewCMS(dcRf, prev);
+        if (newCms.size() < (expectedSize / 2) + 1)
+            throw new IllegalStateException("Too many nodes are currently DOWN 
to safely perform the reconfiguration");
+    }
+
+    private static void serializeDownNodes(PrepareCMSReconfiguration 
transformation, DataOutputPlus out, Version version) throws IOException
+    {
+        out.writeUnsignedVInt32(transformation.downNodes.size());
+        for (NodeId nodeId : transformation.downNodes)
+            NodeId.serializer.serialize(nodeId, out, version);
+    }
+
+    private static Set<NodeId> deserializeDownNodes(DataInputPlus in, Version 
version) throws IOException
+    {
+        Set<NodeId> downNodes = new HashSet<>();
+        int count = in.readUnsignedVInt32();
+        for (int i = 0; i < count; i++)
+            downNodes.add(NodeId.serializer.deserialize(in, version));
+        return downNodes;
+    }
+
+    private static long serializedDownNodesSize(PrepareCMSReconfiguration 
transformation, Version version)
+    {
+        long size = 
TypeSizes.sizeofUnsignedVInt(transformation.downNodes.size());
+        for (NodeId nodeId : transformation.downNodes)
+            size += NodeId.serializer.serializedSize(nodeId, version);
+        return size;
+    }
+
+    public static class Simple extends PrepareCMSReconfiguration
     {
         public static final Simple.Serializer serializer = new Serializer();
 
         private final NodeId toReplace;
 
-        public Simple(NodeId toReplace)
+        public Simple(NodeId toReplace, Set<NodeId> downNodes)
         {
+            super(downNodes);
             this.toReplace = toReplace;
         }
 
+        @Override
+        protected Predicate<NodeId> additionalFilteringPredicate(Set<NodeId> 
downNodes)
+        {
+            // exclude the node being replaced from the new CMS, and avoid any 
down nodes
+            return nodeId -> !nodeId.equals(toReplace) && 
!downNodes.contains(nodeId);
+        }
+
+        @Override
+        protected ReplicationParams newReplicationParams(ClusterMetadata prev)
+        {
+            // a simple reconfiguration retains the existing replication params
+            return ReplicationParams.meta(prev);
+        }
+
         @Override
         public Kind kind()
         {
@@ -94,27 +182,16 @@ public class PrepareCMSReconfiguration
             if 
(!prev.fullCMSMembers().contains(prev.directory.getNodeAddresses(toReplace).broadcastAddress))
                 return new Rejected(INVALID, String.format("%s is not a member 
of CMS. Members: %s", toReplace, prev.fullCMSMembers()));
 
-            ReplicationParams metaParams = ReplicationParams.meta(prev);
-            CMSPlacementStrategy placementStrategy = 
CMSPlacementStrategy.fromReplicationParams(metaParams, nodeId -> 
!nodeId.equals(toReplace));
-            Set<NodeId> currentCms = prev.fullCMSMembers()
-                                         .stream()
-                                         .map(prev.directory::peerId)
-                                         .collect(Collectors.toSet());
-
-            Set<NodeId> newCms = placementStrategy.reconfigure(prev);
-            if (newCms.equals(currentCms))
-            {
-                logger.info("Proposed CMS reconfiguration resulted in no 
required modifications at epoch {}", prev.epoch.getEpoch());
-                return Transformation.success(prev.transformer(), 
LockedRanges.AffectedRanges.EMPTY);
-            }
-            Diff diff = diff(currentCms, newCms);
-            return executeInternal(prev, t -> t, diff);
+            // A simple reconfiguration only kicks off the sequence of 
membership changes, no additional metadata
+            // transformation is required.
+            return executeInternal(prev, t -> t);
         }
 
         public String toString()
         {
             return "PrepareCMSReconfiguration#Simple{" +
                    "toReplace=" + toReplace +
+                   ", downNodes=" + downNodes +
                    '}';
         }
 
@@ -122,34 +199,58 @@ public class PrepareCMSReconfiguration
         {
             public void serialize(Transformation t, DataOutputPlus out, 
Version version) throws IOException
             {
-                Simple tranformation = (Simple) t;
-                NodeId.serializer.serialize(tranformation.toReplace, out, 
version);
+                Simple transformation = (Simple) t;
+                NodeId.serializer.serialize(transformation.toReplace, out, 
version);
+                if (version.isAtLeast(Version.V3))
+                    
PrepareCMSReconfiguration.serializeDownNodes(transformation, out, version);
             }
 
             public Simple deserialize(DataInputPlus in, Version version) 
throws IOException
             {
-                return new Simple(NodeId.serializer.deserialize(in, version));
+                NodeId replaceNode = NodeId.serializer.deserialize(in, 
version);
+                Set<NodeId> downNodes = version.isAtLeast(Version.V3)
+                                        ? 
PrepareCMSReconfiguration.deserializeDownNodes(in, version)
+                                        : Collections.emptySet();
+                return new Simple(replaceNode, downNodes);
             }
 
             public long serializedSize(Transformation t, Version version)
             {
-                Simple tranformation = (Simple) t;
-                return 
NodeId.serializer.serializedSize(tranformation.toReplace, version);
+                Simple transformation = (Simple) t;
+                long size = 
NodeId.serializer.serializedSize(transformation.toReplace, version);
+                if (version.isAtLeast(Version.V3))
+                    size += 
PrepareCMSReconfiguration.serializedDownNodesSize(transformation, version);
+                return size;
             }
         }
     }
 
-    public static class Complex implements Transformation
+    public static class Complex extends PrepareCMSReconfiguration
     {
         public static final Complex.Serializer serializer = new 
Complex.Serializer();
 
         private final ReplicationParams replicationParams;
 
-        public Complex(ReplicationParams replicationParams)
+        public Complex(ReplicationParams replicationParams, Set<NodeId> 
downNodes)
         {
+            super(downNodes);
             this.replicationParams = replicationParams;
         }
 
+        @Override
+        protected Predicate<NodeId> additionalFilteringPredicate(Set<NodeId> 
downNodes)
+        {
+            // exclude any down nodes
+            return nodeId -> !downNodes.contains(nodeId);
+        }
+
+        @Override
+        protected ReplicationParams newReplicationParams(ClusterMetadata 
ignored)
+        {
+            // desired replication params are supplied, so just return them
+            return replicationParams;
+        }
+
         @Override
         public Kind kind()
         {
@@ -159,34 +260,21 @@ public class PrepareCMSReconfiguration
         @Override
         public Result execute(ClusterMetadata prev)
         {
+            // In a complex reconfiguration, in addition to initiating the 
sequence of membership changes,
+            // we're modifying the replication params of the metadata keyspace 
so we supply a function to do that
             KeyspaceMetadata keyspace = 
prev.schema.getKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME);
             KeyspaceMetadata newKeyspace = keyspace.withSwapped(new 
KeyspaceParams(keyspace.params.durableWrites, replicationParams));
 
-            CMSPlacementStrategy placementStrategy = 
CMSPlacementStrategy.fromReplicationParams(replicationParams, nodeId -> true);
-
-            Set<NodeId> currentCms = prev.fullCMSMembers()
-                                         .stream()
-                                         .map(prev.directory::peerId)
-                                         .collect(Collectors.toSet());
-
-            Set<NodeId> newCms = placementStrategy.reconfigure(prev);
-            if (newCms.equals(currentCms))
-            {
-                logger.info("Proposed CMS reconfiguration resulted in no 
required modifications at epoch {}", prev.epoch.getEpoch());
-                return Transformation.success(prev.transformer(), 
LockedRanges.AffectedRanges.EMPTY);
-            }
-            Diff diff = diff(currentCms, newCms);
-
             return executeInternal(prev,
-                                   transformer -> 
transformer.with(prev.placements.replaceParams(prev.nextEpoch(),ReplicationParams.meta(prev),
 replicationParams))
-                                                             .with(new 
DistributedSchema(prev.schema.getKeyspaces().withAddedOrUpdated(newKeyspace))),
-                                   diff);
+                                   transformer -> 
transformer.with(prev.placements.replaceParams(prev.nextEpoch(), 
ReplicationParams.meta(prev), replicationParams))
+                                                             .with(new 
DistributedSchema(prev.schema.getKeyspaces().withAddedOrUpdated(newKeyspace))));
         }
 
         public String toString()
         {
             return "PrepareCMSReconfiguration#Complex{" +
                    "replicationParams=" + replicationParams +
+                   ", downNodes=" + downNodes +
                    '}';
         }
 
@@ -194,19 +282,28 @@ public class PrepareCMSReconfiguration
         {
             public void serialize(Transformation t, DataOutputPlus out, 
Version version) throws IOException
             {
-                Complex tranformation = (Complex) t;
-                
ReplicationParams.serializer.serialize(tranformation.replicationParams, out, 
version);
+                Complex transformation = (Complex) t;
+                
ReplicationParams.serializer.serialize(transformation.replicationParams, out, 
version);
+                if (version.isAtLeast(Version.V3))
+                    
PrepareCMSReconfiguration.serializeDownNodes(transformation, out, version);
             }
 
             public Complex deserialize(DataInputPlus in, Version version) 
throws IOException
             {
-                return new 
Complex(ReplicationParams.serializer.deserialize(in, version));
+                ReplicationParams params = 
ReplicationParams.serializer.deserialize(in, version);
+                Set<NodeId> downNodes = version.isAtLeast(Version.V3)
+                                        ? 
PrepareCMSReconfiguration.deserializeDownNodes(in, version)
+                                        : Collections.emptySet();
+                return new Complex(params, downNodes);
             }
 
             public long serializedSize(Transformation t, Version version)
             {
-                Complex tranformation = (Complex) t;
-                return 
ReplicationParams.serializer.serializedSize(tranformation.replicationParams, 
version);
+                Complex transformation = (Complex) t;
+                long size = 
ReplicationParams.serializer.serializedSize(transformation.replicationParams, 
version);
+                if (version.isAtLeast(Version.V3))
+                    size += 
PrepareCMSReconfiguration.serializedDownNodesSize(transformation, version);
+                return size;
             }
         }
     }
@@ -233,8 +330,51 @@ public class PrepareCMSReconfiguration
         return new Diff(additions, removals);
     }
 
+    private static Map<String, Integer> extractRf(ReplicationParams params)
+    {
+        if (params.isMeta())
+        {
+            assert !params.options.containsKey(REPLICATION_FACTOR);
+            Map<String, Integer> dcRf = new HashMap<>();
+            for (Map.Entry<String, String> entry : params.options.entrySet())
+            {
+                String dc = entry.getKey();
+                ReplicationFactor rf = 
ReplicationFactor.fromString(entry.getValue());
+                dcRf.put(dc, rf.fullReplicas);
+            }
+            return dcRf;
+        }
+        else
+        {
+            throw new IllegalStateException("Can't parse the params: " + 
params);
+        }
+    }
+
+    public static boolean needsReconfiguration(ClusterMetadata metadata)
+    {
+        Map<String, Integer> dcRf = 
extractRf(ReplicationParams.meta(metadata));
+        Set<NodeId> currentCms = metadata.fullCMSMembers()
+                                         .stream()
+                                         .map(metadata.directory::peerId)
+                                         .collect(Collectors.toSet());
+        int expectedSize = 
dcRf.values().stream().mapToInt(Integer::intValue).sum();
+        if (currentCms.size() != expectedSize)
+            return true;
+        for (Map.Entry<String, Integer> dcRfEntry : dcRf.entrySet())
+        {
+            Collection<InetAddressAndPort> nodesInDc = 
metadata.directory.allDatacenterEndpoints().get(dcRfEntry.getKey());
+            if (nodesInDc.size() < dcRfEntry.getValue())
+                return true;
+        }
+
+        CMSPlacementStrategy placementStrategy = new 
CMSPlacementStrategy(dcRf, nodeId -> true);
+        Set<NodeId> newCms = placementStrategy.reconfigure(metadata);
+        return !currentCms.equals(newCms);
+    }
+
     public static class Diff
     {
+        public static final Diff NOCHANGE = new Diff(Collections.emptyList(), 
Collections.emptyList());
         public static final Serializer serializer = new Serializer();
 
         public final List<NodeId> additions;
@@ -246,6 +386,11 @@ public class PrepareCMSReconfiguration
             this.removals = removals;
         }
 
+        public boolean hasChanges()
+        {
+            return this != NOCHANGE;
+        }
+
         public String toString()
         {
             return "Diff{" +
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index 80da569783..a08ba58a06 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
-
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -31,7 +30,6 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
@@ -43,7 +41,6 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
-
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SimpleSeedProvider;
 import org.apache.cassandra.net.Verb;
@@ -54,9 +51,11 @@ import 
org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
 import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.SERIAL;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
@@ -76,10 +75,6 @@ import static org.junit.Assert.fail;
 
 public class CASTest extends CASCommonTestCases
 {
-    static
-    {
-        
CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true);
-    }
 
     /**
      * The {@code cas_contention_timeout} used during the tests
@@ -98,6 +93,8 @@ public class CASTest extends CASCommonTestCases
     public static void beforeClass() throws Throwable
     {
         PAXOS_USE_SELF_EXECUTION.setBoolean(false);
+        TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.setBoolean(true);
+        TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true);
         TestBaseImpl.beforeClass();
         // At times during these tests, node1 is going to be blocked from 
appending entries to its local metadata
         // log in order to induce divergent views of cluster topology between 
instances. This precludes it from
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index b3dbdf796d..d06d79afd1 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -827,7 +827,7 @@ public class ClusterMetadataTestHelper
 
     public static void reconfigureCms(ReplicationParams replication)
     {
-        ClusterMetadata metadata = 
ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Complex(replication));
+        ClusterMetadata metadata = 
ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Complex(replication, Collections.emptySet()));
         while 
(metadata.inProgressSequences.contains(ReconfigureCMS.SequenceKey.instance))
         {
             AdvanceCMSReconfiguration next = ((ReconfigureCMS) 
metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance)).next;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
index 89253b72b8..931ee320e5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
@@ -529,7 +529,7 @@ public class MetadataChangeSimulationTest extends 
CMSTestBase
             for (String s : ntsRf.asMap().keySet())
                 cmsRf.put(s, 3);
 
-            simulateBounces(ntsRf, new 
CMSPlacementStrategy.DatacenterAware(cmsRf, (cm, n) -> random.nextInt(10) > 1), 
random);
+            simulateBounces(ntsRf, new CMSPlacementStrategy(cmsRf, (cm, n) -> 
random.nextInt(10) > 1), random);
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java
new file mode 100644
index 0000000000..909e0a46d3
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ReconfigureCMSStreamingTest extends TestBaseImpl
+{
+    @Test
+    public void testRF1() throws IOException
+    {
+        /*
+        1. place the CMS on the "wrong" node (2)
+        2. execute a bunch of schema changes
+        3. run "cms reconfigure 1" which moves the cms back to the correct 
node (1)
+        4. make sure node1 has all the epochs that node2 had before 
reconfiguration
+         */
+        try (Cluster cluster = init(Cluster.build()
+                                           .withNodes(3)
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           .start()))
+        {
+            cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
+            cluster.get(1).runOnInstance(() -> 
ClusterMetadataService.instance().commit(new 
RemoveFromCMS(InetAddressAndPort.getByNameUnchecked("127.0.0.1"), true)));
+
+            for (int i = 0; i < 20; i++)
+                cluster.schemaChange(withKeyspace("create table %s.tbl"+i+" 
(id int primary key)"));
+
+            long[] epochsBefore = 
epochs(cluster.get(2).executeInternal("select * from 
system_cluster_metadata.distributed_metadata_log"));
+            cluster.get(3).nodetoolResult("cms", "reconfigure", "1");
+            long[] epochsAfter = epochs(cluster.get(1).executeInternal("select 
epoch from system_cluster_metadata.distributed_metadata_log"));
+            assertTrue(epochsBefore.length > 20); // at least 20 schema 
changes above
+            assertTrue(epochsAfter.length > epochsBefore.length); // we get a 
few more epochs from reconfiguration
+            for (int i = 0; i < epochsBefore.length; i++)
+                assertEquals(epochsBefore[i] + " != " + epochsAfter[i], 
epochsBefore[i], epochsAfter[i]);
+        }
+    }
+
+    private long[] epochs(Object[][] res)
+    {
+        long[] epochs = new long[res.length];
+        for (int i = 0; i < res.length; i++)
+            epochs[i] = (long)res[i][0];
+        Arrays.sort(epochs);
+        return epochs;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
index e8713ae35f..d1f983f19a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
@@ -18,7 +18,12 @@
 
 package org.apache.cassandra.distributed.test.log;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
 import org.junit.Assert;
@@ -27,6 +32,9 @@ import org.junit.Test;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.locator.MetaStrategy;
 import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
@@ -40,6 +48,15 @@ import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
 import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.psjava.util.AssertStatus.assertTrue;
+
 public class ReconfigureCMSTest extends FuzzTestBase
 {
     @Test
@@ -59,8 +76,8 @@ public class ReconfigureCMSTest extends FuzzTestBase
             cluster.get(nodeSelector.get()).nodetoolResult("cms", 
"reconfigure", "5").asserts().success();
             cluster.get(1).runOnInstance(() -> {
                 ClusterMetadata metadata = ClusterMetadata.current();
-                Assert.assertEquals(5, metadata.fullCMSMembers().size());
-                Assert.assertEquals(ReplicationParams.simpleMeta(5, 
metadata.directory.knownDatacenters()),
+                assertEquals(5, metadata.fullCMSMembers().size());
+                assertEquals(ReplicationParams.simpleMeta(5, 
metadata.directory.knownDatacenters()),
                                     
metadata.placements.keys().stream().filter(ReplicationParams::isMeta).findFirst().get());
             });
             cluster.stream().forEach(i -> {
@@ -70,8 +87,8 @@ public class ReconfigureCMSTest extends FuzzTestBase
             cluster.get(nodeSelector.get()).nodetoolResult("cms", 
"reconfigure", "1").asserts().success();
             cluster.get(1).runOnInstance(() -> {
                 ClusterMetadata metadata = ClusterMetadata.current();
-                Assert.assertEquals(1, metadata.fullCMSMembers().size());
-                Assert.assertEquals(ReplicationParams.simpleMeta(1, 
metadata.directory.knownDatacenters()),
+                assertEquals(1, metadata.fullCMSMembers().size());
+                assertEquals(ReplicationParams.simpleMeta(1, 
metadata.directory.knownDatacenters()),
                                     
metadata.placements.keys().stream().filter(ReplicationParams::isMeta).findFirst().get());
             });
         }
@@ -89,7 +106,7 @@ public class ReconfigureCMSTest extends FuzzTestBase
         {
             cluster.get(1).nodetoolResult("cms", "reconfigure", 
"2").asserts().success();
             cluster.get(1).runOnInstance(() -> {
-                ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(3).asMeta()));
+                ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(3).asMeta(), 
Collections.emptySet()));
                 ReconfigureCMS reconfigureCMS = (ReconfigureCMS) 
ClusterMetadata.current().inProgressSequences.get(ReconfigureCMS.SequenceKey.instance);
                 ClusterMetadataService.instance().commit(reconfigureCMS.next);
                 
ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current()));
@@ -110,15 +127,15 @@ public class ReconfigureCMSTest extends FuzzTestBase
                 
ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current()));
                 ClusterMetadata metadata = ClusterMetadata.current();
                 
Assert.assertNull(metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance));
-                Assert.assertEquals(2, metadata.fullCMSMembers().size());
+                assertEquals(2, metadata.fullCMSMembers().size());
                 ReplicationParams params = ReplicationParams.meta(metadata);
                 DataPlacement placements = metadata.placements.get(params);
-                Assert.assertEquals(placements.reads, placements.writes);
-                Assert.assertEquals(metadata.fullCMSMembers().size(), 
Integer.parseInt(params.asMap().get("dc0")));
+                assertEquals(placements.reads, placements.writes);
+                assertEquals(metadata.fullCMSMembers().size(), 
Integer.parseInt(params.asMap().get("dc0")));
             });
 
             cluster.get(1).runOnInstance(() -> {
-                ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(4).asMeta()));
+                ClusterMetadataService.instance().commit(new 
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(4).asMeta(), 
Collections.emptySet()));
                 
ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current()));
 
                 ReconfigureCMS reconfigureCMS = (ReconfigureCMS) 
ClusterMetadata.current().inProgressSequences.get(ReconfigureCMS.SequenceKey.instance);
@@ -136,10 +153,116 @@ public class ReconfigureCMSTest extends FuzzTestBase
                 ClusterMetadata metadata = ClusterMetadata.current();
                 
Assert.assertNull(metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance));
                 
Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()));
-                Assert.assertEquals(3, metadata.fullCMSMembers().size());
+                assertEquals(3, metadata.fullCMSMembers().size());
                 DataPlacement placements = 
metadata.placements.get(ReplicationParams.meta(metadata));
-                Assert.assertEquals(placements.reads, placements.writes);
+                assertEquals(placements.reads, placements.writes);
+            });
+        }
+    }
+
+    @Test
+    public void testReconfigureTooManyNodesDown() throws IOException, 
ExecutionException, InterruptedException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withConfig(conf -> 
conf.with(Feature.NETWORK, Feature.GOSSIP))
+                                      .start()))
+        {
+            cluster.get(2).shutdown().get();
+            cluster.get(3).shutdown().get();
+            // Fails as the CMS size would be less than a quorum of what was 
specified (i.e. 3/2 + 1)
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().failure();
+            cluster.get(2).startup();
+            cluster.get(1).runOnInstance(() -> assertEquals(1, 
ClusterMetadata.current().fullCMSMembers().size()));
+
+            // Succeeds, but flags that a further reconfiguration is required
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            cluster.get(1).runOnInstance(() -> assertEquals(2, 
ClusterMetadata.current().fullCMSMembers().size()));
+            cluster.get(1).runOnInstance(() -> 
assertTrue(PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current())));
+
+            // All good
+            cluster.get(3).startup();
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            cluster.get(1).runOnInstance(() -> assertEquals(3, 
ClusterMetadata.current().fullCMSMembers().size()));
+            cluster.get(1).runOnInstance(() -> 
assertFalse(PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current())));
+        }
+    }
+
+    @Test
+    public void testReplaceSameSize() throws IOException, ExecutionException, 
InterruptedException
+    {
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           .withTokenSupplier(node -> 
even.token(node == 4 ? 2 : node))
+                                           .start()))
+        {
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            cluster.get(2).shutdown().get();
+            // now create a new node to replace the other node
+            IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
cluster.get(2), props -> {
+                // since we have a downed host there might be a schema version 
which is old show up but
+                // can't be fetched since the host is down...
+                props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+            });
+            // wait till the replacing node is in the ring
+            awaitRingJoin(cluster.get(1), replacingNode);
+            awaitRingJoin(replacingNode, cluster.get(1));
+            replacingNode.runOnInstance(() -> {
+                ClusterMetadata metadata = ClusterMetadata.current();
+                
assertTrue(metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort()));
+                assertEquals(3, metadata.fullCMSMembers().size());
             });
         }
     }
+
+    @Test
+    public void testReconfigurePickAliveNodesIfPossible() throws Exception
+    {
+        try (Cluster cluster = init(Cluster.build(5)
+                                           .withConfig(conf -> 
conf.with(Feature.NETWORK, Feature.GOSSIP))
+                                           .start()))
+        {
+            cluster.get(2).shutdown().get();
+            cluster.get(3).shutdown().get();
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            cluster.get(2).startup();
+            cluster.get(3).startup();
+
+            Set<String> expectedCMSMembers = expectedCMS(cluster, 1, 4, 5);
+            cluster.forEach(inst -> assertEquals(expectedCMSMembers, 
ClusterUtils.getCMSMembers(inst)));
+        }
+    }
+
+    @Test
+    public void testReconfigurationViolatesRackDiversityIfNecessary() throws 
Exception
+    {
+        // rack1: node1, node3
+        // rack2: node2
+        // rack4: node4
+        // ideal placement for CMS is 1, 2, 4 but if 2 is down, violate rack 
diversity and pick 1, 3, 4
+        try (Cluster cluster = init(Cluster.build(4)
+                                           
.withNodeIdTopology(networkTopology(4, (nodeid) -> nodeid % 2 == 1 ? 
dcAndRack("dc1", "rack1")
+                                                                               
                               : dcAndRack("dc1", "rack" + nodeid)))
+                                           .withConfig(conf -> 
conf.with(Feature.NETWORK, Feature.GOSSIP))
+                                           .start()))
+        {
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            Set<String> rackDiverse = expectedCMS(cluster, 1, 2, 4);
+            cluster.forEach(inst -> assertEquals(rackDiverse, 
ClusterUtils.getCMSMembers(inst)));
+            cluster.get(2).shutdown().get();
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+            cluster.get(2).startup();
+            Set<String> notRackDiverse = expectedCMS(cluster, 1, 4, 3);
+            cluster.forEach(inst -> assertEquals(notRackDiverse, 
ClusterUtils.getCMSMembers(inst)));
+        }
+    }
+
+    // We can't assume that nodeId matches endpoint (ie node3 = 127.0.0.3 etc)
+    private Set<String> expectedCMS(Cluster cluster, int... instanceIds)
+    {
+        Set<String> expectedCMSMembers = new HashSet<>(instanceIds.length);
+        for (int id : instanceIds)
+            
expectedCMSMembers.add(cluster.get(id).config().broadcastAddress().getAddress().toString());
+        return expectedCMSMembers;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
index d90cbd2464..c6a96fa71e 100644
--- a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
@@ -110,7 +110,7 @@ public class MetaStrategyTest
         rf.put("dc2", 2);
         rf.put("dc3", 2);
 
-        CMSPlacementStrategy placementStrategy = new 
CMSPlacementStrategy.DatacenterAware(rf, (cd, n) -> true);
+        CMSPlacementStrategy placementStrategy = new CMSPlacementStrategy(rf, 
(cd, n) -> true);
         Assert.assertEquals(nodeIds(metadata.directory,
                                     1, 2, 4, 5, 7, 8),
                             placementStrategy.reconfigure(metadata));
@@ -119,8 +119,8 @@ public class MetaStrategyTest
                                     1, 2, 4, 5, 7, 8),
                             placementStrategy.reconfigure(metadata));
 
-        placementStrategy = new CMSPlacementStrategy.DatacenterAware(rf, (cd, 
n) -> !n.equals(metadata.directory.peerId(addr(2).broadcastAddress)) &&
-                                                                               
     !n.equals(metadata.directory.peerId(addr(2).broadcastAddress)));
+        placementStrategy = new CMSPlacementStrategy(rf, (cd, n) -> 
!n.equals(metadata.directory.peerId(addr(2).broadcastAddress)) &&
+                                                                    
!n.equals(metadata.directory.peerId(addr(2).broadcastAddress)));
         Assert.assertEquals(nodeIds(metadata.directory,
                                     1, 3, 4, 5, 7, 8),
                             placementStrategy.reconfigure(metadata));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to