This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 279c0527aa Allow CMS reconfiguration to work around DOWN nodes 279c0527aa is described below commit 279c0527aa3d52e1474fee5f37c0227ed6f9da5f Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Sep 26 15:56:03 2024 +0200 Allow CMS reconfiguration to work around DOWN nodes Patch by Sam Tunnicliffe and marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19943 Co-authored-by: Sam Tunnicliffe <s...@apache.org> --- CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 4 + .../cassandra/locator/CMSPlacementStrategy.java | 138 ++++-------- .../org/apache/cassandra/locator/MetaStrategy.java | 25 ++- .../org/apache/cassandra/metrics/TCMMetrics.java | 8 +- .../streaming/DataMovementVerbHandler.java | 4 +- .../org/apache/cassandra/tcm/CMSOperations.java | 6 +- .../cassandra/tcm/ClusterMetadataService.java | 18 +- .../cassandra/tcm/sequences/ReconfigureCMS.java | 9 +- .../cassandra/tcm/serialization/Version.java | 1 + .../cms/PrepareCMSReconfiguration.java | 243 ++++++++++++++++----- .../apache/cassandra/distributed/test/CASTest.java | 13 +- .../test/log/ClusterMetadataTestHelper.java | 2 +- .../test/log/MetadataChangeSimulationTest.java | 2 +- .../test/log/ReconfigureCMSStreamingTest.java | 78 +++++++ .../distributed/test/log/ReconfigureCMSTest.java | 145 +++++++++++- .../apache/cassandra/locator/MetaStrategyTest.java | 6 +- 17 files changed, 517 insertions(+), 186 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 252b9c919c..a69a7222ff 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Allow CMS reconfiguration to work around DOWN nodes (CASSANDRA-19943) * Make TableParams.Serializer set allowAutoSnapshots and incrementalBackups (CASSANDRA-19954) * Make sstabledump possible to show tombstones only (CASSANDRA-19939) * Ensure that RFP queries potentially stale replicas even with only key columns in the row filter (CASSANDRA-19938) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a0baf8de90..56a8aad315 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -533,6 +533,10 @@ public enum CassandraRelevantProperties */ TCM_RECENTLY_SEALED_PERIOD_INDEX_SIZE("cassandra.recently_sealed_period_index_size", "10"), + /** + * for testing purposes disable the automatic CMS reconfiguration after a bootstrap/replace/move operation + */ + TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE("cassandra.test.skip_cms_reconfig_after_topology_change", "false"), /** * should replica groups in data placements be sorted to ensure the primary replica is first in the list */ diff --git a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java index 6ba47ae04d..716b40a34c 100644 --- a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java +++ b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java @@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.Directory; @@ -39,123 +38,70 @@ import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.TokenMap; -import static org.apache.cassandra.locator.SimpleStrategy.REPLICATION_FACTOR; - /** * CMS Placement Strategy is how CMS keeps the number of its members at a configured level, given current * cluster topolgy. It allows to add and remove CMS members when cluster topology changes. For example, during - * node replacement or decommission. + * node replacement or decommission. This attempts to achieve rack diversity, while keeping CMS placements + * close to how "regular" data would get replicated to keep the bounces safe, as long as the user + * bounces at most `f` members of the replica group, where `f = (RF - 1)/2`. */ -public interface CMSPlacementStrategy +public class CMSPlacementStrategy { - Set<NodeId> reconfigure(ClusterMetadata metadata); - boolean needsReconfiguration(ClusterMetadata metadata); + public final Map<String, Integer> rf; + public final BiFunction<ClusterMetadata, NodeId, Boolean> filter; - static CMSPlacementStrategy fromReplicationParams(ReplicationParams params, Predicate<NodeId> filter) + public CMSPlacementStrategy(Map<String, Integer> rf, Predicate<NodeId> filter) { - if (params.isMeta()) - { - assert !params.options.containsKey(REPLICATION_FACTOR); - Map<String, Integer> dcRf = new HashMap<>(); - for (Map.Entry<String, String> entry : params.options.entrySet()) - { - String dc = entry.getKey(); - ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue()); - dcRf.put(dc, rf.fullReplicas); - } - return new DatacenterAware(dcRf, filter); - } - else - { - throw new IllegalStateException("Can't parse the params: " + params); - } + this(rf, new DefaultNodeFilter(filter)); } - /** - * Default reconfiguration strategy: attempts to achieve rack diversity, while keeping CMS placements - * close to how "regular" data would get replicated to keep the bounces safe, as long as the user - * bounces at most `f` members of the replica group, where `f = RF/2`. - */ - class DatacenterAware implements CMSPlacementStrategy + @VisibleForTesting + public CMSPlacementStrategy(Map<String, Integer> rf, BiFunction<ClusterMetadata, NodeId, Boolean> filter) { - public final Map<String, Integer> rf; - public final BiFunction<ClusterMetadata, NodeId, Boolean> filter; - - public DatacenterAware(Map<String, Integer> rf, Predicate<NodeId> filter) - { - this(rf, new DefaultNodeFilter(filter)); - } - - @VisibleForTesting - public DatacenterAware(Map<String, Integer> rf, BiFunction<ClusterMetadata, NodeId, Boolean> filter) - { - this.rf = rf; - this.filter = filter; - } + // todo: verify only test uses with other filter + this.rf = rf; + this.filter = filter; + } - public Set<NodeId> reconfigure(ClusterMetadata metadata) + public Set<NodeId> reconfigure(ClusterMetadata metadata) + { + Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size()); + for (Map.Entry<String, Integer> e : this.rf.entrySet()) { - Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size()); - for (Map.Entry<String, Integer> e : this.rf.entrySet()) - { - Collection<InetAddressAndPort> nodesInDc = metadata.directory.allDatacenterEndpoints().get(e.getKey()); - if (nodesInDc.isEmpty()) - throw new IllegalStateException(String.format("There are no nodes in %s datacenter", e.getKey())); - if (nodesInDc.size() < e.getValue()) - throw new Transformation.RejectedTransformationException(String.format("There are not enough nodes in %s datacenter to satisfy replication factor", e.getKey())); + Collection<InetAddressAndPort> nodesInDc = metadata.directory.allDatacenterEndpoints().get(e.getKey()); + if (nodesInDc.isEmpty()) + throw new IllegalStateException(String.format("There are no nodes in %s datacenter", e.getKey())); + if (nodesInDc.size() < e.getValue()) + throw new Transformation.RejectedTransformationException(String.format("There are not enough nodes in %s datacenter to satisfy replication factor", e.getKey())); - rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue())); - } - return reconfigure(metadata, rf); + rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue())); } - public Set<NodeId> reconfigure(ClusterMetadata metadata, Map<String, ReplicationFactor> rf) + Directory tmpDirectory = metadata.directory; + TokenMap tmpTokenMap = metadata.tokenMap; + for (NodeId peerId : metadata.directory.peerIds()) { - Directory tmpDirectory = metadata.directory; - TokenMap tokenMap = metadata.tokenMap; - for (NodeId peerId : metadata.directory.peerIds()) + if (!filter.apply(metadata, peerId)) { - if (!filter.apply(metadata, peerId)) - { - tmpDirectory = tmpDirectory.without(peerId); - tokenMap = tokenMap.unassignTokens(peerId); - } + tmpDirectory = tmpDirectory.without(peerId); + tmpTokenMap = tmpTokenMap.unassignTokens(peerId); } - - // Although MetaStrategy has its own entireRange, it uses a custom partitioner which isn't compatible with - // regular, non-CMS placements. For that reason, we select replicas here using tokens provided by the - // globally configured partitioner. - Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); - EndpointsForRange endpoints = NetworkTopologyStrategy.calculateNaturalReplicas(minToken, - new Range<>(minToken, minToken), - tmpDirectory, - tokenMap, - rf); - - return endpoints.endpoints().stream().map(metadata.directory::peerId).collect(Collectors.toSet()); } - public boolean needsReconfiguration(ClusterMetadata metadata) - { - Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size()); - for (Map.Entry<String, Integer> e : this.rf.entrySet()) - { - Collection<InetAddressAndPort> nodesInDc = metadata.directory.allDatacenterEndpoints().get(e.getKey()); - if (nodesInDc.size() < e.getValue()) - return true; - rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue())); - } - - Set<NodeId> currentCms = metadata.fullCMSMembers() - .stream() - .map(metadata.directory::peerId) - .collect(Collectors.toSet()); - Set<NodeId> newCms = reconfigure(metadata, rf); - return !currentCms.equals(newCms); - } + // Although MetaStrategy has its own entireRange, it uses a custom partitioner which isn't compatible with + // regular, non-CMS placements. For that reason, we select replicas here using tokens provided by the + // globally configured partitioner. + Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); + EndpointsForRange endpoints = NetworkTopologyStrategy.calculateNaturalReplicas(minToken, + new Range<>(minToken, minToken), + tmpDirectory, + tmpTokenMap, + rf); + + return endpoints.endpoints().stream().map(metadata.directory::peerId).collect(Collectors.toSet()); } - class DefaultNodeFilter implements BiFunction<ClusterMetadata, NodeId, Boolean> + static class DefaultNodeFilter implements BiFunction<ClusterMetadata, NodeId, Boolean> { private final Predicate<NodeId> filter; diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java b/src/java/org/apache/cassandra/locator/MetaStrategy.java index 536ac2fa22..a7afa4898c 100644 --- a/src/java/org/apache/cassandra/locator/MetaStrategy.java +++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java @@ -30,6 +30,8 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.sequences.LockedRanges; +import static org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR; + /** * MetaStrategy is designed for distributed cluster metadata keyspace, and should not be used by * the users directly. This strategy allows a configurable number of nodes to own an entire range and @@ -55,9 +57,26 @@ public class MetaStrategy extends SystemStrategy return new Replica(addr, entireRange, true); } + private final ReplicationFactor rf; + public MetaStrategy(String keyspaceName, Map<String, String> configOptions) { super(keyspaceName, configOptions); + int replicas = 0; + if (configOptions != null) + { + for (Map.Entry<String, String> entry : configOptions.entrySet()) + { + String dc = entry.getKey(); + // prepareOptions should have transformed any "replication_factor" options by now + if (dc.equalsIgnoreCase(REPLICATION_FACTOR)) + continue; + ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue()); + replicas += rf.allReplicas; + } + } + + rf = ReplicationFactor.fullOnly(replicas); } @Override @@ -75,11 +94,7 @@ public class MetaStrategy extends SystemStrategy @Override public ReplicationFactor getReplicationFactor() { - ClusterMetadata metadata = ClusterMetadata.currentNullable(); - if (metadata == null || metadata.epoch.isEqualOrBefore(Epoch.FIRST)) - return ReplicationFactor.fullOnly(1); - int rf = metadata.placements.get(ReplicationParams.meta(metadata)).writes.forRange(entireRange).get().byEndpoint.size(); - return ReplicationFactor.fullOnly(rf); + return rf; } @Override diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java b/src/java/org/apache/cassandra/metrics/TCMMetrics.java index e58a3b11cd..29858ead80 100644 --- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java @@ -25,13 +25,12 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.locator.CMSPlacementStrategy; -import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; +import static org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration; public class TCMMetrics { @@ -96,10 +95,7 @@ public class TCMMetrics needsCMSReconfiguration = Metrics.register(factory.createMetricName("NeedsCMSReconfiguration"), () -> { ClusterMetadata metadata = ClusterMetadata.currentNullable(); - if (metadata == null) - return 0; - CMSPlacementStrategy placementStrategy = CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata), nodeId -> true); - return placementStrategy.needsReconfiguration(metadata) ? 1 : 0; + return metadata != null && needsReconfiguration(metadata) ? 1 : 0; }); fetchedPeerLogEntries = Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false); diff --git a/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java b/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java index 2b1d791ac7..9415f89285 100644 --- a/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java +++ b/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java @@ -36,6 +36,7 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.tcm.ClusterMetadata; public class DataMovementVerbHandler implements IVerbHandler<DataMovement> { @@ -47,8 +48,9 @@ public class DataMovementVerbHandler implements IVerbHandler<DataMovement> { MessagingService.instance().respond(NoPayload.noPayload, message); // let coordinator know we received the message StreamPlan streamPlan = new StreamPlan(StreamOperation.fromString(message.payload.streamOperation)); + ClusterMetadata metadata = ClusterMetadata.current(); Schema.instance.getNonLocalStrategyKeyspaces().stream().forEach((ksm) -> { - if (ksm.replicationStrategy.getReplicationFactor().allReplicas <= 1) + if (metadata.placements.get(ksm.params.replication).writes.byEndpoint().keySet().size() <= 1) return; message.payload.movements.get(ksm.params.replication).asMap().forEach((local, endpoints) -> { diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index 30ee48b20d..87e6d0e372 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.CMSPlacementStrategy; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; @@ -45,6 +44,8 @@ import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration; + public class CMSOperations implements CMSOperationsMBean { public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.tcm:type=CMSOperations"; @@ -142,8 +143,7 @@ public class CMSOperations implements CMSOperationsMBean ClusterMetadata metadata = ClusterMetadata.current(); String members = metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(",")); info.put(MEMBERS, members); - CMSPlacementStrategy placementStrategy = CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata), nodeId -> true); - info.put(NEEDS_RECONFIGURATION, Boolean.toString(placementStrategy.needsReconfiguration(metadata))); + info.put(NEEDS_RECONFIGURATION, Boolean.toString(needsReconfiguration(metadata))); info.put(IS_MEMBER, Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort()))); info.put(SERVICE_STATE, ClusterMetadataService.state(metadata).toString()); info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating())); diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 236e82392a..845b6741f9 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,6 +42,7 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ExceptionCode; import org.apache.cassandra.exceptions.StartupException; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.util.FileInputStreamPlus; import org.apache.cassandra.io.util.FileOutputStreamPlus; import org.apache.cassandra.locator.InetAddressAndPort; @@ -74,6 +76,7 @@ import org.apache.cassandra.utils.concurrent.ImmediateFuture; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.stream.Collectors.toSet; +import static org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE; import static org.apache.cassandra.tcm.ClusterMetadataService.State.GOSSIP; import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; import static org.apache.cassandra.tcm.ClusterMetadataService.State.REMOTE; @@ -364,7 +367,13 @@ public class ClusterMetadataService public void reconfigureCMS(ReplicationParams replicationParams) { - Transformation transformation = new PrepareCMSReconfiguration.Complex(replicationParams); + ClusterMetadata metadata = ClusterMetadata.current(); + Set<NodeId> downNodes = new HashSet<>(); + for (InetAddressAndPort ep : metadata.directory.allJoinedEndpoints()) + if (!FailureDetector.instance.isAlive(ep)) + downNodes.add(metadata.directory.peerId(ep)); + PrepareCMSReconfiguration.Complex transformation = new PrepareCMSReconfiguration.Complex(replicationParams, downNodes); + transformation.verify(metadata); ClusterMetadataService.instance() .commit(transformation); @@ -374,6 +383,13 @@ public class ClusterMetadataService public void ensureCMSPlacement(ClusterMetadata metadata) { + if (TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getBoolean()) + { + logger.info("Not performing CMS reconfiguration as {} property is set. This should only be used for testing.", + TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getKey()); + return; + } + try { reconfigureCMS(ReplicationParams.meta(metadata)); diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java index 1a96833ffb..fb6d9998c4 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.sequences; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -205,9 +206,15 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration { if (!metadata.fullCMSMembers().contains(toRemove)) return; + Set<NodeId> downNodes = new HashSet<>(); + for (InetAddressAndPort ep : metadata.directory.allJoinedEndpoints()) + if (!FailureDetector.instance.isAlive(ep)) + downNodes.add(metadata.directory.peerId(ep)); + PrepareCMSReconfiguration.Simple transformation = new PrepareCMSReconfiguration.Simple(metadata.directory.peerId(toRemove), downNodes); + transformation.verify(metadata); // We can force removal from the CMS as it doesn't alter the size of the service - ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Simple(metadata.directory.peerId(toRemove))); + ClusterMetadataService.instance().commit(transformation); InProgressSequences.finishInProgressSequences(SequenceKey.instance); if (ClusterMetadata.current().isCMSMember(toRemove)) diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java b/src/java/org/apache/cassandra/tcm/serialization/Version.java index 7960c963ed..108348a504 100644 --- a/src/java/org/apache/cassandra/tcm/serialization/Version.java +++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java @@ -40,6 +40,7 @@ public enum Version V2(2), /** * - Serialize allowAutoSnapshot and incrementalBackups when serializing TableParams + * - down nodes serialized in PrepareCMSReconfiguration */ V3(3), diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java index 2d41fb53a6..c8b15c4fa5 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java @@ -20,10 +20,15 @@ package org.apache.cassandra.tcm.transformations.cms; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -33,6 +38,8 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.CMSPlacementStrategy; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicationFactor; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; @@ -49,13 +56,30 @@ import org.apache.cassandra.tcm.serialization.Version; import static org.apache.cassandra.exceptions.ExceptionCode.INVALID; import static org.apache.cassandra.locator.MetaStrategy.entireRange; +import static org.apache.cassandra.tcm.CMSOperations.REPLICATION_FACTOR; -public class PrepareCMSReconfiguration +public abstract class PrepareCMSReconfiguration implements Transformation { private static final Logger logger = LoggerFactory.getLogger(PrepareCMSReconfiguration.class); + final Set<NodeId> downNodes; - private static Transformation.Result executeInternal(ClusterMetadata prev, Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform, Diff diff) + public PrepareCMSReconfiguration(Set<NodeId> downNodes) { + this.downNodes = downNodes; + } + + protected abstract Predicate<NodeId> additionalFilteringPredicate(Set<NodeId> downNodes); + protected abstract ReplicationParams newReplicationParams(ClusterMetadata prev); + + protected Transformation.Result executeInternal(ClusterMetadata prev, Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform) + { + Diff diff = getDiff(prev); + if (!diff.hasChanges()) + { + logger.info("Proposed CMS reconfiguration resulted in no required modifications at epoch {}", prev.epoch.getEpoch()); + return Transformation.success(prev.transformer(), LockedRanges.AffectedRanges.EMPTY); + } + LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch()); Set<NodeId> cms = prev.fullCMSMembers().stream().map(prev.directory::peerId).collect(Collectors.toSet()); Set<NodeId> tmp = new HashSet<>(cms); @@ -71,17 +95,81 @@ public class PrepareCMSReconfiguration return Transformation.success(transform.apply(transformer), LockedRanges.AffectedRanges.EMPTY); } - public static class Simple implements Transformation + private Diff getDiff(ClusterMetadata prev) + { + Set<NodeId> currentCms = prev.fullCMSMemberIds(); + Map<String, Integer> dcRF = extractRf(newReplicationParams(prev)); + Set<NodeId> newCms = prepareNewCMS(dcRF, prev); + if (newCms.equals(currentCms)) + return Diff.NOCHANGE; + return diff(currentCms, newCms); + } + + private Set<NodeId> prepareNewCMS(Map<String, Integer> dcRf, ClusterMetadata prev) + { + CMSPlacementStrategy placementStrategy = new CMSPlacementStrategy(dcRf, additionalFilteringPredicate(downNodes)); + return placementStrategy.reconfigure(prev); + } + + public void verify(ClusterMetadata prev) + { + Map<String, Integer> dcRf = extractRf(newReplicationParams(prev)); + int expectedSize = dcRf.values().stream().mapToInt(Integer::intValue).sum(); + Set<NodeId> newCms = prepareNewCMS(dcRf, prev); + if (newCms.size() < (expectedSize / 2) + 1) + throw new IllegalStateException("Too many nodes are currently DOWN to safely perform the reconfiguration"); + } + + private static void serializeDownNodes(PrepareCMSReconfiguration transformation, DataOutputPlus out, Version version) throws IOException + { + out.writeUnsignedVInt32(transformation.downNodes.size()); + for (NodeId nodeId : transformation.downNodes) + NodeId.serializer.serialize(nodeId, out, version); + } + + private static Set<NodeId> deserializeDownNodes(DataInputPlus in, Version version) throws IOException + { + Set<NodeId> downNodes = new HashSet<>(); + int count = in.readUnsignedVInt32(); + for (int i = 0; i < count; i++) + downNodes.add(NodeId.serializer.deserialize(in, version)); + return downNodes; + } + + private static long serializedDownNodesSize(PrepareCMSReconfiguration transformation, Version version) + { + long size = TypeSizes.sizeofUnsignedVInt(transformation.downNodes.size()); + for (NodeId nodeId : transformation.downNodes) + size += NodeId.serializer.serializedSize(nodeId, version); + return size; + } + + public static class Simple extends PrepareCMSReconfiguration { public static final Simple.Serializer serializer = new Serializer(); private final NodeId toReplace; - public Simple(NodeId toReplace) + public Simple(NodeId toReplace, Set<NodeId> downNodes) { + super(downNodes); this.toReplace = toReplace; } + @Override + protected Predicate<NodeId> additionalFilteringPredicate(Set<NodeId> downNodes) + { + // exclude the node being replaced from the new CMS, and avoid any down nodes + return nodeId -> !nodeId.equals(toReplace) && !downNodes.contains(nodeId); + } + + @Override + protected ReplicationParams newReplicationParams(ClusterMetadata prev) + { + // a simple reconfiguration retains the existing replication params + return ReplicationParams.meta(prev); + } + @Override public Kind kind() { @@ -94,27 +182,16 @@ public class PrepareCMSReconfiguration if (!prev.fullCMSMembers().contains(prev.directory.getNodeAddresses(toReplace).broadcastAddress)) return new Rejected(INVALID, String.format("%s is not a member of CMS. Members: %s", toReplace, prev.fullCMSMembers())); - ReplicationParams metaParams = ReplicationParams.meta(prev); - CMSPlacementStrategy placementStrategy = CMSPlacementStrategy.fromReplicationParams(metaParams, nodeId -> !nodeId.equals(toReplace)); - Set<NodeId> currentCms = prev.fullCMSMembers() - .stream() - .map(prev.directory::peerId) - .collect(Collectors.toSet()); - - Set<NodeId> newCms = placementStrategy.reconfigure(prev); - if (newCms.equals(currentCms)) - { - logger.info("Proposed CMS reconfiguration resulted in no required modifications at epoch {}", prev.epoch.getEpoch()); - return Transformation.success(prev.transformer(), LockedRanges.AffectedRanges.EMPTY); - } - Diff diff = diff(currentCms, newCms); - return executeInternal(prev, t -> t, diff); + // A simple reconfiguration only kicks off the sequence of membership changes, no additional metadata + // transformation is required. + return executeInternal(prev, t -> t); } public String toString() { return "PrepareCMSReconfiguration#Simple{" + "toReplace=" + toReplace + + ", downNodes=" + downNodes + '}'; } @@ -122,34 +199,58 @@ public class PrepareCMSReconfiguration { public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException { - Simple tranformation = (Simple) t; - NodeId.serializer.serialize(tranformation.toReplace, out, version); + Simple transformation = (Simple) t; + NodeId.serializer.serialize(transformation.toReplace, out, version); + if (version.isAtLeast(Version.V3)) + PrepareCMSReconfiguration.serializeDownNodes(transformation, out, version); } public Simple deserialize(DataInputPlus in, Version version) throws IOException { - return new Simple(NodeId.serializer.deserialize(in, version)); + NodeId replaceNode = NodeId.serializer.deserialize(in, version); + Set<NodeId> downNodes = version.isAtLeast(Version.V3) + ? PrepareCMSReconfiguration.deserializeDownNodes(in, version) + : Collections.emptySet(); + return new Simple(replaceNode, downNodes); } public long serializedSize(Transformation t, Version version) { - Simple tranformation = (Simple) t; - return NodeId.serializer.serializedSize(tranformation.toReplace, version); + Simple transformation = (Simple) t; + long size = NodeId.serializer.serializedSize(transformation.toReplace, version); + if (version.isAtLeast(Version.V3)) + size += PrepareCMSReconfiguration.serializedDownNodesSize(transformation, version); + return size; } } } - public static class Complex implements Transformation + public static class Complex extends PrepareCMSReconfiguration { public static final Complex.Serializer serializer = new Complex.Serializer(); private final ReplicationParams replicationParams; - public Complex(ReplicationParams replicationParams) + public Complex(ReplicationParams replicationParams, Set<NodeId> downNodes) { + super(downNodes); this.replicationParams = replicationParams; } + @Override + protected Predicate<NodeId> additionalFilteringPredicate(Set<NodeId> downNodes) + { + // exclude any down nodes + return nodeId -> !downNodes.contains(nodeId); + } + + @Override + protected ReplicationParams newReplicationParams(ClusterMetadata ignored) + { + // desired replication params are supplied, so just return them + return replicationParams; + } + @Override public Kind kind() { @@ -159,34 +260,21 @@ public class PrepareCMSReconfiguration @Override public Result execute(ClusterMetadata prev) { + // In a complex reconfiguration, in addition to initiating the sequence of membership changes, + // we're modifying the replication params of the metadata keyspace so we supply a function to do that KeyspaceMetadata keyspace = prev.schema.getKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME); KeyspaceMetadata newKeyspace = keyspace.withSwapped(new KeyspaceParams(keyspace.params.durableWrites, replicationParams)); - CMSPlacementStrategy placementStrategy = CMSPlacementStrategy.fromReplicationParams(replicationParams, nodeId -> true); - - Set<NodeId> currentCms = prev.fullCMSMembers() - .stream() - .map(prev.directory::peerId) - .collect(Collectors.toSet()); - - Set<NodeId> newCms = placementStrategy.reconfigure(prev); - if (newCms.equals(currentCms)) - { - logger.info("Proposed CMS reconfiguration resulted in no required modifications at epoch {}", prev.epoch.getEpoch()); - return Transformation.success(prev.transformer(), LockedRanges.AffectedRanges.EMPTY); - } - Diff diff = diff(currentCms, newCms); - return executeInternal(prev, - transformer -> transformer.with(prev.placements.replaceParams(prev.nextEpoch(),ReplicationParams.meta(prev), replicationParams)) - .with(new DistributedSchema(prev.schema.getKeyspaces().withAddedOrUpdated(newKeyspace))), - diff); + transformer -> transformer.with(prev.placements.replaceParams(prev.nextEpoch(), ReplicationParams.meta(prev), replicationParams)) + .with(new DistributedSchema(prev.schema.getKeyspaces().withAddedOrUpdated(newKeyspace)))); } public String toString() { return "PrepareCMSReconfiguration#Complex{" + "replicationParams=" + replicationParams + + ", downNodes=" + downNodes + '}'; } @@ -194,19 +282,28 @@ public class PrepareCMSReconfiguration { public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException { - Complex tranformation = (Complex) t; - ReplicationParams.serializer.serialize(tranformation.replicationParams, out, version); + Complex transformation = (Complex) t; + ReplicationParams.serializer.serialize(transformation.replicationParams, out, version); + if (version.isAtLeast(Version.V3)) + PrepareCMSReconfiguration.serializeDownNodes(transformation, out, version); } public Complex deserialize(DataInputPlus in, Version version) throws IOException { - return new Complex(ReplicationParams.serializer.deserialize(in, version)); + ReplicationParams params = ReplicationParams.serializer.deserialize(in, version); + Set<NodeId> downNodes = version.isAtLeast(Version.V3) + ? PrepareCMSReconfiguration.deserializeDownNodes(in, version) + : Collections.emptySet(); + return new Complex(params, downNodes); } public long serializedSize(Transformation t, Version version) { - Complex tranformation = (Complex) t; - return ReplicationParams.serializer.serializedSize(tranformation.replicationParams, version); + Complex transformation = (Complex) t; + long size = ReplicationParams.serializer.serializedSize(transformation.replicationParams, version); + if (version.isAtLeast(Version.V3)) + size += PrepareCMSReconfiguration.serializedDownNodesSize(transformation, version); + return size; } } } @@ -233,8 +330,51 @@ public class PrepareCMSReconfiguration return new Diff(additions, removals); } + private static Map<String, Integer> extractRf(ReplicationParams params) + { + if (params.isMeta()) + { + assert !params.options.containsKey(REPLICATION_FACTOR); + Map<String, Integer> dcRf = new HashMap<>(); + for (Map.Entry<String, String> entry : params.options.entrySet()) + { + String dc = entry.getKey(); + ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue()); + dcRf.put(dc, rf.fullReplicas); + } + return dcRf; + } + else + { + throw new IllegalStateException("Can't parse the params: " + params); + } + } + + public static boolean needsReconfiguration(ClusterMetadata metadata) + { + Map<String, Integer> dcRf = extractRf(ReplicationParams.meta(metadata)); + Set<NodeId> currentCms = metadata.fullCMSMembers() + .stream() + .map(metadata.directory::peerId) + .collect(Collectors.toSet()); + int expectedSize = dcRf.values().stream().mapToInt(Integer::intValue).sum(); + if (currentCms.size() != expectedSize) + return true; + for (Map.Entry<String, Integer> dcRfEntry : dcRf.entrySet()) + { + Collection<InetAddressAndPort> nodesInDc = metadata.directory.allDatacenterEndpoints().get(dcRfEntry.getKey()); + if (nodesInDc.size() < dcRfEntry.getValue()) + return true; + } + + CMSPlacementStrategy placementStrategy = new CMSPlacementStrategy(dcRf, nodeId -> true); + Set<NodeId> newCms = placementStrategy.reconfigure(metadata); + return !currentCms.equals(newCms); + } + public static class Diff { + public static final Diff NOCHANGE = new Diff(Collections.emptyList(), Collections.emptyList()); public static final Serializer serializer = new Serializer(); public final List<NodeId> additions; @@ -246,6 +386,11 @@ public class PrepareCMSReconfiguration this.removals = removals; } + public boolean hasChanges() + { + return this != NOCHANGE; + } + public String toString() { return "Diff{" + diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java index 80da569783..a08ba58a06 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.function.BiConsumer; import java.util.function.Consumer; - import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -31,7 +30,6 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.Cluster; @@ -43,7 +41,6 @@ import org.apache.cassandra.distributed.api.IMessageFilters; import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.exceptions.CasWriteTimeoutException; - import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SimpleSeedProvider; import org.apache.cassandra.net.Verb; @@ -54,9 +51,11 @@ import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION; +import static org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE; +import static org.apache.cassandra.config.CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR; import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY; -import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static org.apache.cassandra.distributed.api.ConsistencyLevel.SERIAL; import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; @@ -76,10 +75,6 @@ import static org.junit.Assert.fail; public class CASTest extends CASCommonTestCases { - static - { - CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true); - } /** * The {@code cas_contention_timeout} used during the tests @@ -98,6 +93,8 @@ public class CASTest extends CASCommonTestCases public static void beforeClass() throws Throwable { PAXOS_USE_SELF_EXECUTION.setBoolean(false); + TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.setBoolean(true); + TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true); TestBaseImpl.beforeClass(); // At times during these tests, node1 is going to be blocked from appending entries to its local metadata // log in order to induce divergent views of cluster topology between instances. This precludes it from diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index b3dbdf796d..d06d79afd1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -827,7 +827,7 @@ public class ClusterMetadataTestHelper public static void reconfigureCms(ReplicationParams replication) { - ClusterMetadata metadata = ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Complex(replication)); + ClusterMetadata metadata = ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Complex(replication, Collections.emptySet())); while (metadata.inProgressSequences.contains(ReconfigureCMS.SequenceKey.instance)) { AdvanceCMSReconfiguration next = ((ReconfigureCMS) metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance)).next; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java index 89253b72b8..931ee320e5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java @@ -529,7 +529,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase for (String s : ntsRf.asMap().keySet()) cmsRf.put(s, 3); - simulateBounces(ntsRf, new CMSPlacementStrategy.DatacenterAware(cmsRf, (cm, n) -> random.nextInt(10) > 1), random); + simulateBounces(ntsRf, new CMSPlacementStrategy(cmsRf, (cm, n) -> random.nextInt(10) > 1), random); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java new file mode 100644 index 0000000000..909e0a46d3 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import java.io.IOException; +import java.util.Arrays; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.sequences.AddToCMS; +import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ReconfigureCMSStreamingTest extends TestBaseImpl +{ + @Test + public void testRF1() throws IOException + { + /* + 1. place the CMS on the "wrong" node (2) + 2. execute a bunch of schema changes + 3. run "cms reconfigure 1" which moves the cms back to the correct node (1) + 4. make sure node1 has all the epochs that node2 had before reconfiguration + */ + try (Cluster cluster = init(Cluster.build() + .withNodes(3) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .start())) + { + cluster.get(2).runOnInstance(() -> AddToCMS.initiate()); + cluster.get(1).runOnInstance(() -> ClusterMetadataService.instance().commit(new RemoveFromCMS(InetAddressAndPort.getByNameUnchecked("127.0.0.1"), true))); + + for (int i = 0; i < 20; i++) + cluster.schemaChange(withKeyspace("create table %s.tbl"+i+" (id int primary key)")); + + long[] epochsBefore = epochs(cluster.get(2).executeInternal("select * from system_cluster_metadata.distributed_metadata_log")); + cluster.get(3).nodetoolResult("cms", "reconfigure", "1"); + long[] epochsAfter = epochs(cluster.get(1).executeInternal("select epoch from system_cluster_metadata.distributed_metadata_log")); + assertTrue(epochsBefore.length > 20); // at least 20 schema changes above + assertTrue(epochsAfter.length > epochsBefore.length); // we get a few more epochs from reconfiguration + for (int i = 0; i < epochsBefore.length; i++) + assertEquals(epochsBefore[i] + " != " + epochsAfter[i], epochsBefore[i], epochsAfter[i]); + } + } + + private long[] epochs(Object[][] res) + { + long[] epochs = new long[res.length]; + for (int i = 0; i < res.length; i++) + epochs[i] = (long)res[i][0]; + Arrays.sort(epochs); + return epochs; + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java index e8713ae35f..d1f983f19a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java @@ -18,7 +18,12 @@ package org.apache.cassandra.distributed.test.log; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import org.junit.Assert; @@ -27,6 +32,9 @@ import org.junit.Test; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.locator.MetaStrategy; import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; @@ -40,6 +48,15 @@ import org.apache.cassandra.tcm.sequences.ReconfigureCMS; import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK; +import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin; +import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.psjava.util.AssertStatus.assertTrue; + public class ReconfigureCMSTest extends FuzzTestBase { @Test @@ -59,8 +76,8 @@ public class ReconfigureCMSTest extends FuzzTestBase cluster.get(nodeSelector.get()).nodetoolResult("cms", "reconfigure", "5").asserts().success(); cluster.get(1).runOnInstance(() -> { ClusterMetadata metadata = ClusterMetadata.current(); - Assert.assertEquals(5, metadata.fullCMSMembers().size()); - Assert.assertEquals(ReplicationParams.simpleMeta(5, metadata.directory.knownDatacenters()), + assertEquals(5, metadata.fullCMSMembers().size()); + assertEquals(ReplicationParams.simpleMeta(5, metadata.directory.knownDatacenters()), metadata.placements.keys().stream().filter(ReplicationParams::isMeta).findFirst().get()); }); cluster.stream().forEach(i -> { @@ -70,8 +87,8 @@ public class ReconfigureCMSTest extends FuzzTestBase cluster.get(nodeSelector.get()).nodetoolResult("cms", "reconfigure", "1").asserts().success(); cluster.get(1).runOnInstance(() -> { ClusterMetadata metadata = ClusterMetadata.current(); - Assert.assertEquals(1, metadata.fullCMSMembers().size()); - Assert.assertEquals(ReplicationParams.simpleMeta(1, metadata.directory.knownDatacenters()), + assertEquals(1, metadata.fullCMSMembers().size()); + assertEquals(ReplicationParams.simpleMeta(1, metadata.directory.knownDatacenters()), metadata.placements.keys().stream().filter(ReplicationParams::isMeta).findFirst().get()); }); } @@ -89,7 +106,7 @@ public class ReconfigureCMSTest extends FuzzTestBase { cluster.get(1).nodetoolResult("cms", "reconfigure", "2").asserts().success(); cluster.get(1).runOnInstance(() -> { - ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Complex(ReplicationParams.simple(3).asMeta())); + ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Complex(ReplicationParams.simple(3).asMeta(), Collections.emptySet())); ReconfigureCMS reconfigureCMS = (ReconfigureCMS) ClusterMetadata.current().inProgressSequences.get(ReconfigureCMS.SequenceKey.instance); ClusterMetadataService.instance().commit(reconfigureCMS.next); ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current())); @@ -110,15 +127,15 @@ public class ReconfigureCMSTest extends FuzzTestBase ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current())); ClusterMetadata metadata = ClusterMetadata.current(); Assert.assertNull(metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance)); - Assert.assertEquals(2, metadata.fullCMSMembers().size()); + assertEquals(2, metadata.fullCMSMembers().size()); ReplicationParams params = ReplicationParams.meta(metadata); DataPlacement placements = metadata.placements.get(params); - Assert.assertEquals(placements.reads, placements.writes); - Assert.assertEquals(metadata.fullCMSMembers().size(), Integer.parseInt(params.asMap().get("dc0"))); + assertEquals(placements.reads, placements.writes); + assertEquals(metadata.fullCMSMembers().size(), Integer.parseInt(params.asMap().get("dc0"))); }); cluster.get(1).runOnInstance(() -> { - ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Complex(ReplicationParams.simple(4).asMeta())); + ClusterMetadataService.instance().commit(new PrepareCMSReconfiguration.Complex(ReplicationParams.simple(4).asMeta(), Collections.emptySet())); ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current())); ReconfigureCMS reconfigureCMS = (ReconfigureCMS) ClusterMetadata.current().inProgressSequences.get(ReconfigureCMS.SequenceKey.instance); @@ -136,10 +153,116 @@ public class ReconfigureCMSTest extends FuzzTestBase ClusterMetadata metadata = ClusterMetadata.current(); Assert.assertNull(metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance)); Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort())); - Assert.assertEquals(3, metadata.fullCMSMembers().size()); + assertEquals(3, metadata.fullCMSMembers().size()); DataPlacement placements = metadata.placements.get(ReplicationParams.meta(metadata)); - Assert.assertEquals(placements.reads, placements.writes); + assertEquals(placements.reads, placements.writes); + }); + } + } + + @Test + public void testReconfigureTooManyNodesDown() throws IOException, ExecutionException, InterruptedException + { + try (Cluster cluster = init(Cluster.build(3) + .withConfig(conf -> conf.with(Feature.NETWORK, Feature.GOSSIP)) + .start())) + { + cluster.get(2).shutdown().get(); + cluster.get(3).shutdown().get(); + // Fails as the CMS size would be less than a quorum of what was specified (i.e. 3/2 + 1) + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().failure(); + cluster.get(2).startup(); + cluster.get(1).runOnInstance(() -> assertEquals(1, ClusterMetadata.current().fullCMSMembers().size())); + + // Succeeds, but flags that a further reconfiguration is required + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + cluster.get(1).runOnInstance(() -> assertEquals(2, ClusterMetadata.current().fullCMSMembers().size())); + cluster.get(1).runOnInstance(() -> assertTrue(PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current()))); + + // All good + cluster.get(3).startup(); + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + cluster.get(1).runOnInstance(() -> assertEquals(3, ClusterMetadata.current().fullCMSMembers().size())); + cluster.get(1).runOnInstance(() -> assertFalse(PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current()))); + } + } + + @Test + public void testReplaceSameSize() throws IOException, ExecutionException, InterruptedException + { + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3); + try (Cluster cluster = init(Cluster.build(3) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .withTokenSupplier(node -> even.token(node == 4 ? 2 : node)) + .start())) + { + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + cluster.get(2).shutdown().get(); + // now create a new node to replace the other node + IInvokableInstance replacingNode = replaceHostAndStart(cluster, cluster.get(2), props -> { + // since we have a downed host there might be a schema version which is old show up but + // can't be fetched since the host is down... + props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true); + }); + // wait till the replacing node is in the ring + awaitRingJoin(cluster.get(1), replacingNode); + awaitRingJoin(replacingNode, cluster.get(1)); + replacingNode.runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + assertTrue(metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort())); + assertEquals(3, metadata.fullCMSMembers().size()); }); } } + + @Test + public void testReconfigurePickAliveNodesIfPossible() throws Exception + { + try (Cluster cluster = init(Cluster.build(5) + .withConfig(conf -> conf.with(Feature.NETWORK, Feature.GOSSIP)) + .start())) + { + cluster.get(2).shutdown().get(); + cluster.get(3).shutdown().get(); + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + cluster.get(2).startup(); + cluster.get(3).startup(); + + Set<String> expectedCMSMembers = expectedCMS(cluster, 1, 4, 5); + cluster.forEach(inst -> assertEquals(expectedCMSMembers, ClusterUtils.getCMSMembers(inst))); + } + } + + @Test + public void testReconfigurationViolatesRackDiversityIfNecessary() throws Exception + { + // rack1: node1, node3 + // rack2: node2 + // rack4: node4 + // ideal placement for CMS is 1, 2, 4 but if 2 is down, violate rack diversity and pick 1, 3, 4 + try (Cluster cluster = init(Cluster.build(4) + .withNodeIdTopology(networkTopology(4, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") + : dcAndRack("dc1", "rack" + nodeid))) + .withConfig(conf -> conf.with(Feature.NETWORK, Feature.GOSSIP)) + .start())) + { + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + Set<String> rackDiverse = expectedCMS(cluster, 1, 2, 4); + cluster.forEach(inst -> assertEquals(rackDiverse, ClusterUtils.getCMSMembers(inst))); + cluster.get(2).shutdown().get(); + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + cluster.get(2).startup(); + Set<String> notRackDiverse = expectedCMS(cluster, 1, 4, 3); + cluster.forEach(inst -> assertEquals(notRackDiverse, ClusterUtils.getCMSMembers(inst))); + } + } + + // We can't assume that nodeId matches endpoint (ie node3 = 127.0.0.3 etc) + private Set<String> expectedCMS(Cluster cluster, int... instanceIds) + { + Set<String> expectedCMSMembers = new HashSet<>(instanceIds.length); + for (int id : instanceIds) + expectedCMSMembers.add(cluster.get(id).config().broadcastAddress().getAddress().toString()); + return expectedCMSMembers; + } } diff --git a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java index d90cbd2464..c6a96fa71e 100644 --- a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java @@ -110,7 +110,7 @@ public class MetaStrategyTest rf.put("dc2", 2); rf.put("dc3", 2); - CMSPlacementStrategy placementStrategy = new CMSPlacementStrategy.DatacenterAware(rf, (cd, n) -> true); + CMSPlacementStrategy placementStrategy = new CMSPlacementStrategy(rf, (cd, n) -> true); Assert.assertEquals(nodeIds(metadata.directory, 1, 2, 4, 5, 7, 8), placementStrategy.reconfigure(metadata)); @@ -119,8 +119,8 @@ public class MetaStrategyTest 1, 2, 4, 5, 7, 8), placementStrategy.reconfigure(metadata)); - placementStrategy = new CMSPlacementStrategy.DatacenterAware(rf, (cd, n) -> !n.equals(metadata.directory.peerId(addr(2).broadcastAddress)) && - !n.equals(metadata.directory.peerId(addr(2).broadcastAddress))); + placementStrategy = new CMSPlacementStrategy(rf, (cd, n) -> !n.equals(metadata.directory.peerId(addr(2).broadcastAddress)) && + !n.equals(metadata.directory.peerId(addr(2).broadcastAddress))); Assert.assertEquals(nodeIds(metadata.directory, 1, 3, 4, 5, 7, 8), placementStrategy.reconfigure(metadata)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org