This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new dc13f327eb Fix-up offset broadcasting logic
dc13f327eb is described below
commit dc13f327ebc0a6685aa54e4d8f449464b2652934
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Wed May 7 18:24:58 2025 +0100
Fix-up offset broadcasting logic
patch by Aleksey Yeschenko; reviewed by Abe Ratnofsky for CASSANDRA-20576
---
.../cassandra/replication/CoordinatorLog.java | 21 +++---
.../apache/cassandra/replication/Participants.java | 6 ++
.../MutationTrackingWriteForwardingTest.java | 81 +++++++++-------------
3 files changed, 48 insertions(+), 60 deletions(-)
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 7dad88cf97..0e4bdb7cdb 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -84,7 +84,7 @@ public abstract class CoordinatorLog
if (!getLocal().contains(mutationId.offset()))
return; // local host hasn't witnessed yet -> no cleanup needed
- if (hasWrittenToRemoteReplicas(mutationId.offset()))
+ if (remoteReplicasWitnessed(mutationId.offset()))
{
logger.trace("marking mutation {} as fully reconciled",
mutationId);
// if all replicas have now witnessed the id, remove it from
the index
@@ -107,9 +107,8 @@ public abstract class CoordinatorLog
{
for (int offset = start; offset <= end; ++offset)
{
- // TODO (desired): skip checking the host's offsets - all
just added
// TODO (desired): use the fact that Offsets are ordered
to optimise this look up
- if (hasWrittenLocally(offset) &&
hasWrittenToRemoteReplicas(offset))
+ if (othersWitnessed(offset, onHostId))
{
reconciledOffsets.add(offset);
unreconciledMutations().remove(offset);
@@ -167,7 +166,7 @@ public abstract class CoordinatorLog
unreconciledMutations().finishWriting(mutation);
- if (hasWrittenToRemoteReplicas(offset))
+ if (remoteReplicasWitnessed(offset))
{
reconciledOffsets.add(offset);
unreconciledMutations().remove(offset);
@@ -179,22 +178,22 @@ public abstract class CoordinatorLog
}
}
- private boolean hasWrittenLocally(int offset)
- {
- return getLocal().contains(offset);
- }
-
- private boolean hasWrittenToRemoteReplicas(int offset)
+ private boolean othersWitnessed(int offset, int exceptHostId)
{
for (int i = 0; i < participants.size(); ++i)
{
int hostId = participants.get(i);
- if (hostId != localHostId && !get(hostId).contains(offset))
+ if (hostId != exceptHostId && !get(hostId).contains(offset))
return false;
}
return true;
}
+ private boolean remoteReplicasWitnessed(int offset)
+ {
+ return othersWitnessed(offset, localHostId);
+ }
+
/**
* Look up unreconciled sequence ids of mutations witnessed by this host
in this coordinataor log.
* Adds the ids to the supplied collection, so it can be reused to
aggregate lookups for multiple logs.
diff --git a/src/java/org/apache/cassandra/replication/Participants.java
b/src/java/org/apache/cassandra/replication/Participants.java
index 494dd3adca..f245797586 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/Participants.java
@@ -57,4 +57,10 @@ public class Participants
throw new IllegalArgumentException("Out of bounds host idx " +
idx);
return hosts[idx];
}
+
+ @Override
+ public String toString()
+ {
+ return Arrays.toString(hosts);
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
index 9eb19a5fc9..6209c1c1b6 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
@@ -15,14 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.distributed.test.tracking;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -31,7 +28,6 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -39,7 +35,6 @@ import org.apache.cassandra.replication.MutationSummary;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
-import org.assertj.core.api.Assertions;
import static java.lang.String.format;
import static
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
@@ -64,7 +59,7 @@ public class MutationTrackingWriteForwardingTest extends
TestBaseImpl
public void testBasicWriteForwarding() throws Throwable
{
// 2 DCs, 1 replica in each, to test forwarding to instances in remote
DCs and local DCs
- Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(5,
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2",
"rack2"));
+ Map<Integer, NetworkTopology.DcAndRack> topology =
networkTopology(NODES, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1")
: dcAndRack("dc2", "rack2"));
// TODO: disable background reconciliation so we can test that writes
are reconciling immediately
try (Cluster cluster = Cluster.build(NODES)
@@ -82,56 +77,44 @@ public class MutationTrackingWriteForwardingTest extends
TestBaseImpl
"AND replication_type='tracked';",
keyspaceName));
cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v
int, primary key (k, c));", keyspaceName, tableName));
- Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
int ROWS = 100;
for (int inserted = 0; inserted < ROWS; inserted++)
{
// Writes should be completed for the client, regardless of
whether they are forwarded or not
cluster.coordinator(inst(inserted)).execute(format("INSERT
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName),
ConsistencyLevel.ALL, inserted, inserted, inserted);
+ }
- // Writes should be ack'd in the journal too, but these could
lag behind client acks, so could be
- // permissive here. Each write should be reconciled on 1
leader, unreconciled on TOTAL_RF-1
- // replicas (until background reconciliation broadcast is
implemented), and ignored on others.
- Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
- Set<IInvokableInstance> replicas = new HashSet<>();
- for (IInvokableInstance instance : cluster)
- {
- int unreconciled = instance.callOnInstance(() -> {
- Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
- Range<Token> fullRange = new Range<>(token, token);
- TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
- MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
- return summary.unreconciledIds();
- });
- int lastUnreconciled =
instanceUnreconciled.getOrDefault(instance, 0);
- int newUnreconciled = unreconciled - lastUnreconciled;
+ Thread.sleep(1000); // allow time for all offsets to be broadcasted
- if (newUnreconciled == 0)
- {
- // instance already reconciled (as leader) or did not
receive new mutation ID (non-replica)
- leaderOrNonReplica.add(instance);
- }
- else if (newUnreconciled == 1)
- {
- // instance has not reconciled, so it's a replica
(until reconciliation broadcast is implemented)
- replicas.add(instance);
- }
- else
- {
- Assertions.fail("Should not have more than one new
unreconciled mutation");
- }
- instanceUnreconciled.put(instance, unreconciled);
- }
- Assertions.assertThat(leaderOrNonReplica).hasSize(2);
- Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1);
+ int allReconciled = 0;
+ int allUnreconciled = 0;
+
+ // Writes should be ack'd in the journal too, but these could lag
behind client acks, so could be
+ // permissive here. Each write should be reconciled everywhere.
+ for (IInvokableInstance instance : cluster)
+ {
+ int reconciled = instance.callOnInstance(() -> {
+ Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
+ Range<Token> fullRange = new Range<>(token, token);
+ TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+ MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
+ return summary.reconciledIds();
+ });
+
+ int unreconciled = instance.callOnInstance(() -> {
+ Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
+ Range<Token> fullRange = new Range<>(token, token);
+ TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+ MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
+ return summary.unreconciledIds();
+ });
+
+ allReconciled += reconciled;
+ allUnreconciled += unreconciled;
}
- Assertions.assertThat(instanceUnreconciled).matches(map -> {
- int sum = 0;
- for (Integer value : map.values())
- sum += value;
- // Each write is reconciled on the leader, unreconciled on all
other replicas
- return sum == (ROWS * (TOTAL_RF - 1));
- });
+
+ Assert.assertEquals(0, allUnreconciled);
+ Assert.assertEquals(ROWS * TOTAL_RF, allReconciled);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]