This is an automated email from the ASF dual-hosted git repository.
aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 70809160af Fixup inter-DC forwarding of writes from a
coordinator-replica (fixup)
70809160af is described below
commit 70809160afb8dfd4d610865ff6337af3d9bf6f63
Author: Abe Ratnofsky <[email protected]>
AuthorDate: Wed May 7 11:28:09 2025 -0400
Fixup inter-DC forwarding of writes from a coordinator-replica (fixup)
Patch by Abe Ratnofsky; Reviewed by Aleksey Yeshchenko for CASSANDRA-20336
---
.../org/apache/cassandra/net/RequestCallbacks.java | 8 +---
.../cassandra/replication/TrackedWriteRequest.java | 7 +++-
.../MutationTrackingWriteForwardingTest.java | 44 ++++++++++++++++------
3 files changed, 39 insertions(+), 20 deletions(-)
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 4a46b73b5a..5ec59d29e7 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.slf4j.Logger;
@@ -36,7 +35,6 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
-import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import static java.lang.String.format;
@@ -97,17 +95,13 @@ public class RequestCallbacks implements
OutboundMessageCallbacks
*/
public void addWithExpiration(RequestCallback<?> cb, Message<?> message,
InetAddressAndPort to)
{
- // mutations need to call the overload
- Preconditions.checkArgument((message.verb() != Verb.MUTATION_REQ &&
message.verb() != Verb.COUNTER_MUTATION_REQ) || (cb instanceof
ForwardedWrite.LeaderCallback));
CallbackInfo previous = callbacks.put(key(message.id(), to), new
CallbackInfo(message, to, cb));
assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to, previous);
}
public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
Message<?> message, Replica to)
{
- Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ ||
message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() ==
Verb.PAXOS_COMMIT_REQ || message.verb() == Verb.FORWARDING_WRITE);
- CallbackInfo previous = callbacks.put(key(message.id(),
to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
- assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to.endpoint(), previous);
+ addWithExpiration(cb, message, to.endpoint());
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index ba58f4f1f7..9cc7b3241c 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -281,7 +281,12 @@ public class TrackedWriteRequest
for (Replica replica : forwardToReplicas)
{
-
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica.endpoint());
+ if (handler instanceof TrackedWriteResponseHandler)
+
MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler)
handler, message, replica);
+ else if (handler instanceof ForwardedWrite.LeaderCallback)
+
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica.endpoint());
+ else
+ throw new IllegalStateException();
logger.trace("Adding FWD message to {}@{}", message.id(),
replica);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
index 108451ff81..9eb19a5fc9 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
@@ -19,7 +19,9 @@
package org.apache.cassandra.distributed.test.tracking;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.junit.Test;
@@ -45,8 +47,13 @@ import static
org.apache.cassandra.distributed.shared.NetworkTopology.networkTop
public class MutationTrackingWriteForwardingTest extends TestBaseImpl
{
- private static final int NODES = 3;
- private static final int RF = 1;
+ // Need a mix of local and remote replicas, multiple remote replicas to
ensure inter-DC forwarding hits both
+ // paths for chosen replica in a remote DC, and all other replicas in a
remote DC.
+ // Include an extra node to ensure non-replicas behave correctly.
+ private static final int NODES = 5;
+ private static final int RF_PER_DC = 2;
+ private static final int NUM_DCS = 2;
+ private static final int TOTAL_RF = RF_PER_DC * NUM_DCS;
private static int inst(int i)
{
@@ -57,7 +64,7 @@ public class MutationTrackingWriteForwardingTest extends
TestBaseImpl
public void testBasicWriteForwarding() throws Throwable
{
// 2 DCs, 1 replica in each, to test forwarding to instances in remote
DCs and local DCs
- Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(3,
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2",
"rack2"));
+ Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(5,
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2",
"rack2"));
// TODO: disable background reconciliation so we can test that writes
are reconciling immediately
try (Cluster cluster = Cluster.build(NODES)
@@ -71,7 +78,7 @@ public class MutationTrackingWriteForwardingTest extends
TestBaseImpl
String keyspaceName = "basic_write_forwarding_test";
String tableName = "tbl";
cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication =
" +
- "{'class': 'NetworkTopologyStrategy',
'replication_factor': " + RF + "} " +
+ "{'class': 'NetworkTopologyStrategy',
'replication_factor': " + RF_PER_DC + "} " +
"AND replication_type='tracked';",
keyspaceName));
cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v
int, primary key (k, c));", keyspaceName, tableName));
@@ -83,9 +90,10 @@ public class MutationTrackingWriteForwardingTest extends
TestBaseImpl
cluster.coordinator(inst(inserted)).execute(format("INSERT
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName),
ConsistencyLevel.ALL, inserted, inserted, inserted);
// Writes should be ack'd in the journal too, but these could
lag behind client acks, so could be
- // permissive here. Each write should be reconciled on the
leader, unreconciled on the replica (until
- // background reconciliation broadcast is implemented), and
ignored on others.
- IInstance replica = null;
+ // permissive here. Each write should be reconciled on 1
leader, unreconciled on TOTAL_RF-1
+ // replicas (until background reconciliation broadcast is
implemented), and ignored on others.
+ Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
+ Set<IInvokableInstance> replicas = new HashSet<>();
for (IInvokableInstance instance : cluster)
{
int unreconciled = instance.callOnInstance(() -> {
@@ -97,20 +105,32 @@ public class MutationTrackingWriteForwardingTest extends
TestBaseImpl
});
int lastUnreconciled =
instanceUnreconciled.getOrDefault(instance, 0);
int newUnreconciled = unreconciled - lastUnreconciled;
- if (newUnreconciled == 1)
+
+ if (newUnreconciled == 0)
+ {
+ // instance already reconciled (as leader) or did not
receive new mutation ID (non-replica)
+ leaderOrNonReplica.add(instance);
+ }
+ else if (newUnreconciled == 1)
+ {
+ // instance has not reconciled, so it's a replica
(until reconciliation broadcast is implemented)
+ replicas.add(instance);
+ }
+ else
{
- Assertions.assertThat(replica).isNull();
- replica = instance;
+ Assertions.fail("Should not have more than one new
unreconciled mutation");
}
instanceUnreconciled.put(instance, unreconciled);
}
- Assertions.assertThat(replica).isNotNull();
+ Assertions.assertThat(leaderOrNonReplica).hasSize(2);
+ Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1);
}
Assertions.assertThat(instanceUnreconciled).matches(map -> {
int sum = 0;
for (Integer value : map.values())
sum += value;
- return sum == ROWS;
+ // Each write is reconciled on the leader, unreconciled on all
other replicas
+ return sum == (ROWS * (TOTAL_RF - 1));
});
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]