This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 51d048a93a Add cluster metadata id to gossip syn messages 51d048a93a is described below commit 51d048a93a7e7cfb93a544dabba4b6f7aa1bbdd1 Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Mon Apr 29 11:36:51 2024 +0100 Add cluster metadata id to gossip syn messages Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-19613 --- CHANGES.txt | 1 + .../org/apache/cassandra/gms/GossipDigestSyn.java | 15 +- .../cassandra/gms/GossipDigestSynVerbHandler.java | 7 + src/java/org/apache/cassandra/gms/Gossiper.java | 3 + src/java/org/apache/cassandra/gms/NewGossiper.java | 6 +- test/data/serialization/5.1/gms.EndpointState.bin | Bin 73 -> 73 bytes test/data/serialization/5.1/gms.Gossip.bin | Bin 166 -> 4914 bytes .../distributed/test/tcm/SplitBrainTest.java | 161 ++++++++++++++++++--- .../apache/cassandra/gms/SerializationsTest.java | 7 +- .../org/apache/cassandra/net/HandshakeTest.java | 3 +- .../cassandra/net/OutboundConnectionsTest.java | 3 +- 11 files changed, 179 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 168f40d224..c8355deda2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Add cluster metadata id to gossip syn messages (CASSANDRA-19613) * Reduce heap usage occupied by the metrics (CASSANDRA-19567) * Improve handling of transient replicas during range movements (CASSANDRA-19344) * Enable debounced internode log requests to be cancelled at shutdown (CASSANDRA-19514) diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java index 7c2ae945c8..ec4639f087 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java @@ -25,6 +25,8 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tcm.ClusterMetadata; /** * This is the first message that gets sent out as a start of the Gossip protocol in a @@ -37,12 +39,14 @@ public class GossipDigestSyn final String clusterId; final String partioner; + final int metadataId; final List<GossipDigest> gDigests; - public GossipDigestSyn(String clusterId, String partioner, List<GossipDigest> gDigests) + public GossipDigestSyn(String clusterId, String partioner, int metadataId, List<GossipDigest> gDigests) { this.clusterId = clusterId; this.partioner = partioner; + this.metadataId = metadataId; this.gDigests = gDigests; } @@ -85,6 +89,8 @@ class GossipDigestSynSerializer implements IVersionedSerializer<GossipDigestSyn> { out.writeUTF(gDigestSynMessage.clusterId); out.writeUTF(gDigestSynMessage.partioner); + if (version >= MessagingService.VERSION_51) + out.writeUnsignedVInt32(gDigestSynMessage.metadataId); GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests, out, version); } @@ -93,14 +99,19 @@ class GossipDigestSynSerializer implements IVersionedSerializer<GossipDigestSyn> String clusterId = in.readUTF(); String partioner = null; partioner = in.readUTF(); + int metadataId = version >= MessagingService.VERSION_51 + ? in.readUnsignedVInt32() + : ClusterMetadata.EMPTY_METADATA_IDENTIFIER; List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(in, version); - return new GossipDigestSyn(clusterId, partioner, gDigests); + return new GossipDigestSyn(clusterId, partioner, metadataId, gDigests); } public long serializedSize(GossipDigestSyn syn, int version) { long size = TypeSizes.sizeof(syn.clusterId); size += TypeSizes.sizeof(syn.partioner); + if (version >= MessagingService.VERSION_51) + size += TypeSizes.sizeofUnsignedVInt(syn.metadataId); size += GossipDigestSerializationHelper.serializedSize(syn.gDigests, version); return size; } diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java index c2713863cc..692e962528 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java @@ -30,6 +30,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tcm.ClusterMetadata; import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_ACK; @@ -65,6 +66,12 @@ public class GossipDigestSynVerbHandler extends GossipVerbHandler<GossipDigestSy return; } + if (gDigestMessage.metadataId != ClusterMetadata.current().metadataIdentifier) + { + logger.warn("Cluster metadata identifier mismatch from {} {}!={}", from, gDigestMessage.metadataId, ClusterMetadata.current().metadataIdentifier); + return; + } + List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); // if the syn comes from a peer performing a shadow round and this node is // also currently in a shadow round, send back a minimal ack. This node must diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 35cf57d3e1..7d3e0c9cc5 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -260,6 +260,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, { GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), + ClusterMetadata.current().metadataIdentifier, gDigests); Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); /* Gossip to some random live member */ @@ -1353,6 +1354,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, InetAddressAndPort ep = entry.getKey(); if (ep.equals(getBroadcastAddressAndPort())) continue; + if (justRemovedEndpoints.containsKey(ep)) { if (logger.isTraceEnabled()) @@ -2223,6 +2225,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, Gossiper.instance.makeGossipDigest(gDigests); GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), + ClusterMetadata.current().metadataIdentifier, gDigests); Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); sendGossip(message, cms); diff --git a/src/java/org/apache/cassandra/gms/NewGossiper.java b/src/java/org/apache/cassandra/gms/NewGossiper.java index 8473a40d34..4bb8b75968 100644 --- a/src/java/org/apache/cassandra/gms/NewGossiper.java +++ b/src/java/org/apache/cassandra/gms/NewGossiper.java @@ -38,6 +38,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.compatibility.GossipHelper; import org.apache.cassandra.utils.concurrent.Accumulator; import org.apache.cassandra.utils.concurrent.AsyncPromise; @@ -127,7 +128,10 @@ public class NewGossiper public Promise<Map<InetAddressAndPort, EndpointState>> doShadowRound() { // send a completely empty syn - GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), new ArrayList<>()); + GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), + getPartitionerName(), + ClusterMetadata.current().metadataIdentifier, + new ArrayList<>()); Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); logger.info("Sending shadow round GOSSIP DIGEST SYN to known peers {}", peers); diff --git a/test/data/serialization/5.1/gms.EndpointState.bin b/test/data/serialization/5.1/gms.EndpointState.bin index d5c4eacd84..9baeac9816 100644 Binary files a/test/data/serialization/5.1/gms.EndpointState.bin and b/test/data/serialization/5.1/gms.EndpointState.bin differ diff --git a/test/data/serialization/5.1/gms.Gossip.bin b/test/data/serialization/5.1/gms.Gossip.bin index 7a4fb5666e..083da42747 100644 Binary files a/test/data/serialization/5.1/gms.Gossip.bin and b/test/data/serialization/5.1/gms.Gossip.bin differ diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java index 9533eb393d..1382f8b063 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java @@ -19,49 +19,171 @@ package org.apache.cassandra.distributed.test.tcm; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SimpleSeedProvider; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.log.LogState; +import org.awaitility.Awaitility; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class SplitBrainTest extends TestBaseImpl { @Test public void testSplitBrainStartup() throws IOException, TimeoutException { - // partition the cluster in 2 parts on startup, node1, node2 in one, node3, node4 in the other - try (Cluster cluster = builder().withNodes(4) - .withConfig(config -> config.with(GOSSIP).with(NETWORK) - .set("seed_provider", new IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(), - Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3"))) - .set("discovery_timeout", "1s")) - .createWithoutStarting()) + try (Setup setup = setupSplitBrainCluster()) { - cluster.filters().allVerbs().from(1,2).to(3,4).drop(); - cluster.filters().allVerbs().from(3,4).to(1,2).drop(); - List<Thread> startupThreads = new ArrayList<>(4); - for (int i = 0; i < 4; i++) - { - int threadNr = i + 1; - startupThreads.add(new Thread(() -> cluster.get(threadNr).startup())); - } - startupThreads.forEach(Thread::start); - startupThreads.forEach(SplitBrainTest::join); - cluster.filters().reset(); + Cluster cluster = setup.cluster; + // Perform a schema change on one of the clusters resulting from the split brain during initialisation + // before dropping message filters. When comms can be reestablished, we fake the replication of metdata + // state from on cluster to the other. cluster.coordinator(1).execute(withKeyspace("create keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':1}"), ConsistencyLevel.ALL); + long clusterOneEpoch = ClusterUtils.getCurrentEpoch(cluster.get(1)).getEpoch(); + long clusterTwoEpoch = ClusterUtils.getCurrentEpoch(cluster.get(3)).getEpoch(); + assertTrue(clusterOneEpoch > clusterTwoEpoch); + + // Artificially induce node1 to replicate to node3. This should be rejected by node3 as the two technically + // belong to different clusters. + long mark = cluster.get(3).logs().mark(); + // Turn off the initial filters + setup.reenableCommunication(); + + cluster.get(1).runOnInstance(() -> { + LogState state = LogState.getForRecovery(ClusterMetadata.current().epoch); + MessagingService.instance().send(Message.out(Verb.TCM_REPLICATION, state), + InetAddressAndPort.getByNameUnchecked("127.0.0.3")); + }); + cluster.get(3).logs().watchFor(mark, Duration.ofSeconds(10), "Cluster Metadata Identifier mismatch"); + assertEquals(clusterOneEpoch, ClusterUtils.getCurrentEpoch(cluster.get(1)).getEpoch()); + assertEquals(clusterTwoEpoch, ClusterUtils.getCurrentEpoch(cluster.get(3)).getEpoch()); + } + } + + + @Test + public void testFilterGossipStatesWithMismatchingMetadataId() throws IOException, TimeoutException + { + try (Setup setup = setupSplitBrainCluster()) + { + Cluster cluster = setup.cluster; + // Allow nodes from the two clusters to communicate again. Because each node's seed list contains an + // instance from the other cluster, they will attempt to perform gossip exchange with that instance. + // Verify that when this happens, gossip state isn't updated with instances from the other cluster. + AtomicInteger node1Received = new AtomicInteger(0); + AtomicInteger node3Received = new AtomicInteger(0); + + cluster.filters().inbound().from(1,2,3,4).to(1,2,3,4).messagesMatching((from, to, msg) -> { + if (msg.verb() == Verb.GOSSIP_DIGEST_SYN.id || + msg.verb() == Verb.GOSSIP_DIGEST_ACK.id || + msg.verb() == Verb.GOSSIP_DIGEST_ACK2.id) + { + if (to == 1 && (from == 3 || from == 4)) + node1Received.incrementAndGet(); + if (to == 3 && (from == 1 || from == 2)) + node3Received.incrementAndGet(); + } + return false; + }).drop().on(); + + // Turn off the initial filters + setup.reenableCommunication(); + + // Wait for cross-cluster gossip communication + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .until(() -> node1Received.get() > 5 && node3Received.get() > 5); + + // Verify that gossip states for nodes which are not a member of the same cluster were disregarded. + // Each node should have gossip state only for itself and the one other member of its cluster. + cluster.forEach(inst -> { + int id = inst.config().num(); + boolean gossipStateValid = inst.callOnInstance((() -> { + Map<InetAddressAndPort, EndpointState> eps = Gossiper.instance.endpointStateMap; + if (eps.size() != 2) + return false; + Collection<InetAddressAndPort> expectedEps = (id <= 2) + ? Arrays.asList(InetAddressAndPort.getByNameUnchecked("127.0.0.1"), + InetAddressAndPort.getByNameUnchecked("127.0.0.2")) + : Arrays.asList(InetAddressAndPort.getByNameUnchecked("127.0.0.3"), + InetAddressAndPort.getByNameUnchecked("127.0.0.4")); + return eps.keySet().containsAll(expectedEps); + })); + Assert.assertTrue(String.format("Unexpected gossip state on node %s", id), gossipStateValid); + }); + } + } - cluster.get(3).logs().watchFor("Cluster Metadata Identifier mismatch"); + private Setup setupSplitBrainCluster() throws IOException + { + // partition the cluster in 2 parts on startup, node1, node2 in one, node3, node4 in the other + Cluster cluster = builder().withNodes(4) + .withConfig(config -> config.with(GOSSIP).with(NETWORK) + .set("seed_provider", new IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(), + Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3"))) + .set("discovery_timeout", "1s")) + .createWithoutStarting(); + IMessageFilters.Filter drop1 = cluster.filters().allVerbs().from(1, 2).to(3, 4).drop(); + IMessageFilters.Filter drop2 = cluster.filters().allVerbs().from(3, 4).to(1, 2).drop(); + List<Thread> startupThreads = new ArrayList<>(4); + for (int i = 0; i < 4; i++) + { + int threadNr = i + 1; + startupThreads.add(new Thread(() -> cluster.get(threadNr).startup())); + } + startupThreads.forEach(Thread::start); + startupThreads.forEach(SplitBrainTest::join); + return new Setup(cluster, drop1, drop2); + } + + private final class Setup implements AutoCloseable + { + final Cluster cluster; + final IMessageFilters.Filter[] filters; + + Setup(Cluster cluster, IMessageFilters.Filter ... filters) + { + this.cluster = cluster; + this.filters = filters; + } + + void reenableCommunication() + { + for (IMessageFilters.Filter filter : filters) + filter.off(); + } + + @Override + public void close() + { + cluster.close(); } } @@ -76,4 +198,5 @@ public class SplitBrainTest extends TestBaseImpl throw new RuntimeException(e); } } + } diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java index 1dc44345de..c7320b194e 100644 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@ -87,6 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester GossipDigestAck2 ack2 = new GossipDigestAck2(states); GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", ClusterMetadata.current().tokenMap.partitioner().getClass().getCanonicalName(), + 20240430, Statics.Digests); DataOutputStreamPlus out = getOutput("gms.Gossip.bin"); @@ -113,8 +114,8 @@ public class SerializationsTest extends AbstractSerializationsTester int count = 0; FileInputStreamPlus in = getInput("gms.Gossip.bin"); - while (count < Statics.Digests.size()) - assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null; + while (count++ < Statics.Digests.size()) + assert GossipDigest.serializer.deserialize(in, getVersion()) != null; assert GossipDigestAck.serializer.deserialize(in, getVersion()) != null; assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null; assert GossipDigestSyn.serializer.deserialize(in, getVersion()) != null; @@ -130,7 +131,7 @@ public class SerializationsTest extends AbstractSerializationsTester private static VersionedValue vv0 = vvFact.load(23d); private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(partitioner.getRandomToken())); private static List<GossipDigest> Digests = new ArrayList<GossipDigest>(); - + static { HeartbeatSt.updateHeartBeat(); EndpointSt.addApplicationState(ApplicationState.LOAD, vv0); diff --git a/test/unit/org/apache/cassandra/net/HandshakeTest.java b/test/unit/org/apache/cassandra/net/HandshakeTest.java index 99e33eea90..b66ade056e 100644 --- a/test/unit/org/apache/cassandra/net/HandshakeTest.java +++ b/test/unit/org/apache/cassandra/net/HandshakeTest.java @@ -57,6 +57,7 @@ import static org.apache.cassandra.net.MessagingService.minimum_version; import static org.apache.cassandra.net.OutboundConnectionInitiator.Result; import static org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType; import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateMessaging; +import static org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -317,7 +318,7 @@ public class HandshakeTest .withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx = t)) .withFrom(FROM_ADDR); OutboundConnections outboundConnections = OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings); - GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", new ArrayList<>(0)); + GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0)); Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn); OutboundConnection outboundConnection = outboundConnections.connectionFor(message); outboundConnection.enqueue(message); diff --git a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java index 6acdf0ce21..5afbe88315 100644 --- a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java +++ b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java @@ -44,6 +44,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import static org.apache.cassandra.net.MessagingService.current_version; import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD; +import static org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER; public class OutboundConnectionsTest { @@ -96,7 +97,7 @@ public class OutboundConnectionsTest @Test public void getConnection_Gossip() { - GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", new ArrayList<>(0)); + GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0)); Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn); Assert.assertEquals(ConnectionType.URGENT_MESSAGES, connections.connectionFor(message).type()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org