This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6f79207c34e2d828e798a8a96df67aeaea2a45a1
Author: Marcus Eriksson <[email protected]>
AuthorDate: Fri Apr 4 08:13:11 2025 +0200

    Improve metadata log catch up with inter-DC mutation forwarding
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20523
---
 CHANGES.txt                                        |  1 +
 .../cassandra/db/AbstractMutationVerbHandler.java  | 28 +++++-----
 .../test/log/FetchLogFromPeersDCTest.java          | 60 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 26eb4a062c..f74d385e83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Improve metadata log catch up with inter-DC mutation forwarding 
(CASSANDRA-20523)
  * Support topology-safe changes to Datacenter & Rack for live nodes 
(CASSANDRA-20528)
  * Add SSTableIntervalTree latency metric (CASSANDRA-20502)
  * Ignore repetitions of semicolon in CQLSH (CASSANDRA-19956)
diff --git a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
index cfea7eb45c..fe3acdba06 100644
--- a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
@@ -54,15 +54,15 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
         if (message.epoch().isAfter(Epoch.EMPTY))
         {
             ClusterMetadata metadata = ClusterMetadata.current();
-            metadata = checkTokenOwnership(metadata, message);
-            metadata = checkSchemaVersion(metadata, message);
+            metadata = checkTokenOwnership(metadata, message, respondTo);
+            metadata = checkSchemaVersion(metadata, message, respondTo);
         }
         applyMutation(message, respondTo);
     }
 
     abstract void applyMutation(Message<T> message, InetAddressAndPort 
respondToAddress);
 
-    private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, 
Message<T> message)
+    private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, 
Message<T> message, InetAddressAndPort respondTo)
     {
         String keyspace = message.payload.getKeyspaceName();
         DecoratedKey key = message.payload.key();
@@ -75,13 +75,13 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
             // since coordinator's routing may be more recent.
             if (!forToken.get().containsSelf())
             {
-                metadata = 
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, 
message.from(), message.epoch());
+                metadata = 
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, 
message.epoch());
                 forToken = writePlacements(metadata, keyspace, key);
             }
             // Otherwise, coordinator and the replica agree about the 
placement of the givent token, so catch-up can be async
             else
             {
-                
ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(metadata, 
message.from(), message.epoch());
+                
ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(metadata, 
respondTo, message.epoch());
             }
         }
 
@@ -89,8 +89,8 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
         {
             StorageService.instance.incOutOfRangeOperationCount();
             
Keyspace.open(message.payload.getKeyspaceName()).metric.outOfRangeTokenWrites.inc();
-            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.SECONDS, logMessageTemplate, message.from(), key.getToken(), 
message.payload.getKeyspaceName());
-            throw InvalidRoutingException.forWrite(message.from(), 
key.getToken(), metadata.epoch, message.payload);
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.SECONDS, logMessageTemplate, respondTo, key.getToken(), 
message.payload.getKeyspaceName());
+            throw InvalidRoutingException.forWrite(respondTo, key.getToken(), 
metadata.epoch, message.payload);
         }
 
         if (forToken.lastModified().isAfter(message.epoch()))
@@ -103,7 +103,7 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
         return metadata;
     }
 
-    private ClusterMetadata checkSchemaVersion(ClusterMetadata metadata, 
Message<T> message)
+    private ClusterMetadata checkSchemaVersion(ClusterMetadata metadata, 
Message<T> message, InetAddressAndPort respondTo)
     {
         if 
(SchemaConstants.isSystemKeyspace(message.payload.getKeyspaceName()) || 
message.epoch().is(metadata.epoch))
             return metadata;
@@ -121,10 +121,10 @@ public abstract class AbstractMutationVerbHandler<T 
extends IMutation> implement
                     {
                         // the partition update was serialized after the epoch 
we currently know, catch up and
                         // make sure we've seen the epoch it has seen, 
otherwise fail request.
-                        metadata = 
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, 
message.from(), message.epoch());
+                        metadata = 
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, 
message.epoch());
                         if (pu.serializedAtEpoch.isAfter(metadata.epoch))
                             throw new 
IllegalStateException(String.format("Coordinator %s is still ahead after 
fetching log, our epoch = %s, their epoch = %s",
-                                                                          
message.from(),
+                                                                          
respondTo,
                                                                           
metadata.epoch, message.epoch()));
                     }
                 }
@@ -143,7 +143,7 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
                         {
                             TCMMetrics.instance.coordinatorBehindSchema.mark();
                             throw new 
CoordinatorBehindException(String.format("Coordinator %s is behind, our epoch = 
%s, their epoch = %s",
-                                                                               
message.from(),
+                                                                               
respondTo,
                                                                                
metadata.epoch, message.epoch()));
                         }
                     }
@@ -151,7 +151,7 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
                     {
                         TCMMetrics.instance.coordinatorBehindSchema.mark();
                         throw new 
CoordinatorBehindException(String.format("Schema mismatch, coordinator %s is 
behind, we're missing table %s.%s, our epoch = %s, their epoch = %s",
-                                                                           
message.from(),
+                                                                           
respondTo,
                                                                            
pu.metadata().keyspace,
                                                                            
pu.metadata().name,
                                                                            
metadata.epoch, message.epoch()));
@@ -165,13 +165,13 @@ public abstract class AbstractMutationVerbHandler<T 
extends IMutation> implement
             {
                 TCMMetrics.instance.coordinatorBehindSchema.mark();
                 throw new CoordinatorBehindException(String.format("Schema 
mismatch, coordinator %s is behind, we're missing keyspace %s, our epoch = %s, 
their epoch = %s",
-                                                                   
message.from(),
+                                                                   respondTo,
                                                                    keyspace,
                                                                    
metadata.epoch, message.epoch()));
             }
             else
             {
-                metadata = 
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, 
message.from(), message.epoch());
+                metadata = 
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, 
message.epoch());
             }
         }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersDCTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersDCTest.java
new file mode 100644
index 0000000000..b47108d1d0
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersDCTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static org.apache.cassandra.net.Verb.TCM_FETCH_PEER_LOG_REQ;
+import static org.apache.cassandra.net.Verb.TCM_REPLICATION;
+import static org.junit.Assert.assertEquals;
+
+public class FetchLogFromPeersDCTest extends TestBaseImpl
+{
+
+    @Test
+    public void catchupCoordinatorBehindTestPlacements() throws Exception
+    {
+        try (Cluster cluster = init(builder().withNodes(4).withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP))
+                                             .withoutVNodes()
+                                             
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+                                             
.withNodeIdTopology(NetworkTopology.networkTopology(4, (i) -> 
NetworkTopology.dcAndRack("dc" + (i <= 2 ? 0 : 1), "rack" + i)))
+                                             .start()))
+        {
+            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'NetworkTopologyStrategy', 'dc0':2, 'dc1':2}"));
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
+            
cluster.filters().inbound().verbs(TCM_REPLICATION.id).from(1).to(3, 4).drop();
+            // don't allow the dc1 nodes to catch up from eachother - we 
should catch up from the actual originator of the message:
+            
cluster.filters().inbound().verbs(TCM_FETCH_PEER_LOG_REQ.id).from(3, 
4).to(3,4).drop();
+            cluster.get(1).schemaChangeInternal(withKeyspace("alter table 
%s.tbl with comment='abc'"));
+            cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl 
(id) values (1)"), ConsistencyLevel.ALL);
+            long epoch = cluster.get(1).callOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch());
+            cluster.forEach(i -> i.runOnInstance(() -> {
+                assertEquals(epoch, 
ClusterMetadata.current().epoch.getEpoch());
+            }));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to