aratno commented on code in PR #4145:
URL: https://github.com/apache/cassandra/pull/4145#discussion_r2079972514
##########
test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java:
##########
@@ -82,56 +77,44 @@ public void testBasicWriteForwarding() throws Throwable
"AND replication_type='tracked';",
keyspaceName));
cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v
int, primary key (k, c));", keyspaceName, tableName));
- Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
int ROWS = 100;
for (int inserted = 0; inserted < ROWS; inserted++)
{
// Writes should be completed for the client, regardless of
whether they are forwarded or not
cluster.coordinator(inst(inserted)).execute(format("INSERT
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName),
ConsistencyLevel.ALL, inserted, inserted, inserted);
+ }
- // Writes should be ack'd in the journal too, but these could
lag behind client acks, so could be
- // permissive here. Each write should be reconciled on 1
leader, unreconciled on TOTAL_RF-1
- // replicas (until background reconciliation broadcast is
implemented), and ignored on others.
- Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
- Set<IInvokableInstance> replicas = new HashSet<>();
- for (IInvokableInstance instance : cluster)
- {
- int unreconciled = instance.callOnInstance(() -> {
- Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
- Range<Token> fullRange = new Range<>(token, token);
- TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
- MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
- return summary.unreconciledIds();
- });
- int lastUnreconciled =
instanceUnreconciled.getOrDefault(instance, 0);
- int newUnreconciled = unreconciled - lastUnreconciled;
+ Thread.sleep(1000); // allow time for all offsets to be broadcasted
- if (newUnreconciled == 0)
- {
- // instance already reconciled (as leader) or did not
receive new mutation ID (non-replica)
- leaderOrNonReplica.add(instance);
- }
- else if (newUnreconciled == 1)
- {
- // instance has not reconciled, so it's a replica
(until reconciliation broadcast is implemented)
- replicas.add(instance);
- }
- else
- {
- Assertions.fail("Should not have more than one new
unreconciled mutation");
- }
- instanceUnreconciled.put(instance, unreconciled);
- }
- Assertions.assertThat(leaderOrNonReplica).hasSize(2);
- Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1);
+ int allReconciled = 0;
+ int allUnreconciled = 0;
+
+ // Writes should be ack'd in the journal too, but these could lag
behind client acks, so could be
+ // permissive here. Each write should be reconciled everywhere.
+ for (IInvokableInstance instance : cluster)
+ {
+ int reconciled = instance.callOnInstance(() -> {
+ Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
+ Range<Token> fullRange = new Range<>(token, token);
+ TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+ MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
+ return summary.reconciledIds();
Review Comment:
Note for later - would be nice to have metrics to differentiate writes that
are reconciled locally on the leader, from in-band write acknowledgements on
replicas, or from broadcast.
##########
test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java:
##########
@@ -82,56 +77,44 @@ public void testBasicWriteForwarding() throws Throwable
"AND replication_type='tracked';",
keyspaceName));
cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v
int, primary key (k, c));", keyspaceName, tableName));
- Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
int ROWS = 100;
for (int inserted = 0; inserted < ROWS; inserted++)
{
// Writes should be completed for the client, regardless of
whether they are forwarded or not
cluster.coordinator(inst(inserted)).execute(format("INSERT
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName),
ConsistencyLevel.ALL, inserted, inserted, inserted);
+ }
- // Writes should be ack'd in the journal too, but these could
lag behind client acks, so could be
- // permissive here. Each write should be reconciled on 1
leader, unreconciled on TOTAL_RF-1
- // replicas (until background reconciliation broadcast is
implemented), and ignored on others.
- Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
- Set<IInvokableInstance> replicas = new HashSet<>();
- for (IInvokableInstance instance : cluster)
- {
- int unreconciled = instance.callOnInstance(() -> {
- Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
- Range<Token> fullRange = new Range<>(token, token);
- TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
- MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
- return summary.unreconciledIds();
- });
- int lastUnreconciled =
instanceUnreconciled.getOrDefault(instance, 0);
- int newUnreconciled = unreconciled - lastUnreconciled;
+ Thread.sleep(1000); // allow time for all offsets to be broadcasted
Review Comment:
Fine to merge as-is, but could replace this with a signal from each instance
that a full cycle of broadcast has been received, via a BlockingQueue or
semaphore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]