This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6f79207c34e2d828e798a8a96df67aeaea2a45a1 Author: Marcus Eriksson <[email protected]> AuthorDate: Fri Apr 4 08:13:11 2025 +0200 Improve metadata log catch up with inter-DC mutation forwarding Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20523 --- CHANGES.txt | 1 + .../cassandra/db/AbstractMutationVerbHandler.java | 28 +++++----- .../test/log/FetchLogFromPeersDCTest.java | 60 ++++++++++++++++++++++ 3 files changed, 75 insertions(+), 14 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 26eb4a062c..f74d385e83 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Improve metadata log catch up with inter-DC mutation forwarding (CASSANDRA-20523) * Support topology-safe changes to Datacenter & Rack for live nodes (CASSANDRA-20528) * Add SSTableIntervalTree latency metric (CASSANDRA-20502) * Ignore repetitions of semicolon in CQLSH (CASSANDRA-19956) diff --git a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java index cfea7eb45c..fe3acdba06 100644 --- a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java @@ -54,15 +54,15 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement if (message.epoch().isAfter(Epoch.EMPTY)) { ClusterMetadata metadata = ClusterMetadata.current(); - metadata = checkTokenOwnership(metadata, message); - metadata = checkSchemaVersion(metadata, message); + metadata = checkTokenOwnership(metadata, message, respondTo); + metadata = checkSchemaVersion(metadata, message, respondTo); } applyMutation(message, respondTo); } abstract void applyMutation(Message<T> message, InetAddressAndPort respondToAddress); - private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, Message<T> message) + private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, Message<T> message, InetAddressAndPort respondTo) { String keyspace = message.payload.getKeyspaceName(); DecoratedKey key = message.payload.key(); @@ -75,13 +75,13 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement // since coordinator's routing may be more recent. if (!forToken.get().containsSelf()) { - metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, message.from(), message.epoch()); + metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, message.epoch()); forToken = writePlacements(metadata, keyspace, key); } // Otherwise, coordinator and the replica agree about the placement of the givent token, so catch-up can be async else { - ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(metadata, message.from(), message.epoch()); + ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(metadata, respondTo, message.epoch()); } } @@ -89,8 +89,8 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement { StorageService.instance.incOutOfRangeOperationCount(); Keyspace.open(message.payload.getKeyspaceName()).metric.outOfRangeTokenWrites.inc(); - NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, logMessageTemplate, message.from(), key.getToken(), message.payload.getKeyspaceName()); - throw InvalidRoutingException.forWrite(message.from(), key.getToken(), metadata.epoch, message.payload); + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, logMessageTemplate, respondTo, key.getToken(), message.payload.getKeyspaceName()); + throw InvalidRoutingException.forWrite(respondTo, key.getToken(), metadata.epoch, message.payload); } if (forToken.lastModified().isAfter(message.epoch())) @@ -103,7 +103,7 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement return metadata; } - private ClusterMetadata checkSchemaVersion(ClusterMetadata metadata, Message<T> message) + private ClusterMetadata checkSchemaVersion(ClusterMetadata metadata, Message<T> message, InetAddressAndPort respondTo) { if (SchemaConstants.isSystemKeyspace(message.payload.getKeyspaceName()) || message.epoch().is(metadata.epoch)) return metadata; @@ -121,10 +121,10 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement { // the partition update was serialized after the epoch we currently know, catch up and // make sure we've seen the epoch it has seen, otherwise fail request. - metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, message.from(), message.epoch()); + metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, message.epoch()); if (pu.serializedAtEpoch.isAfter(metadata.epoch)) throw new IllegalStateException(String.format("Coordinator %s is still ahead after fetching log, our epoch = %s, their epoch = %s", - message.from(), + respondTo, metadata.epoch, message.epoch())); } } @@ -143,7 +143,7 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement { TCMMetrics.instance.coordinatorBehindSchema.mark(); throw new CoordinatorBehindException(String.format("Coordinator %s is behind, our epoch = %s, their epoch = %s", - message.from(), + respondTo, metadata.epoch, message.epoch())); } } @@ -151,7 +151,7 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement { TCMMetrics.instance.coordinatorBehindSchema.mark(); throw new CoordinatorBehindException(String.format("Schema mismatch, coordinator %s is behind, we're missing table %s.%s, our epoch = %s, their epoch = %s", - message.from(), + respondTo, pu.metadata().keyspace, pu.metadata().name, metadata.epoch, message.epoch())); @@ -165,13 +165,13 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement { TCMMetrics.instance.coordinatorBehindSchema.mark(); throw new CoordinatorBehindException(String.format("Schema mismatch, coordinator %s is behind, we're missing keyspace %s, our epoch = %s, their epoch = %s", - message.from(), + respondTo, keyspace, metadata.epoch, message.epoch())); } else { - metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, message.from(), message.epoch()); + metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, message.epoch()); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersDCTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersDCTest.java new file mode 100644 index 0000000000..b47108d1d0 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersDCTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.ClusterMetadata; + +import static org.apache.cassandra.net.Verb.TCM_FETCH_PEER_LOG_REQ; +import static org.apache.cassandra.net.Verb.TCM_REPLICATION; +import static org.junit.Assert.assertEquals; + +public class FetchLogFromPeersDCTest extends TestBaseImpl +{ + + @Test + public void catchupCoordinatorBehindTestPlacements() throws Exception + { + try (Cluster cluster = init(builder().withNodes(4).withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)) + .withoutVNodes() + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4)) + .withNodeIdTopology(NetworkTopology.networkTopology(4, (i) -> NetworkTopology.dcAndRack("dc" + (i <= 2 ? 0 : 1), "rack" + i))) + .start())) + { + cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'NetworkTopologyStrategy', 'dc0':2, 'dc1':2}")); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + cluster.filters().inbound().verbs(TCM_REPLICATION.id).from(1).to(3, 4).drop(); + // don't allow the dc1 nodes to catch up from eachother - we should catch up from the actual originator of the message: + cluster.filters().inbound().verbs(TCM_FETCH_PEER_LOG_REQ.id).from(3, 4).to(3,4).drop(); + cluster.get(1).schemaChangeInternal(withKeyspace("alter table %s.tbl with comment='abc'")); + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (1)"), ConsistencyLevel.ALL); + long epoch = cluster.get(1).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()); + cluster.forEach(i -> i.runOnInstance(() -> { + assertEquals(epoch, ClusterMetadata.current().epoch.getEpoch()); + })); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
