iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2665312483
##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -95,23 +96,45 @@ public static AbstractWriteResponseHandler<?> perform(
if (logger.isTraceEnabled())
logger.trace("Remote tracked request {} {}", mutation, plan);
writeMetrics.remoteRequests.mark();
- return ForwardedWrite.forwardMutation(mutation, plan, rs,
requestTime);
+
+ if (mutation instanceof CounterMutation)
+ return ForwardedWrite.forwardCounterMutation((CounterMutation)
mutation, plan, rs, requestTime);
+ else
+ return ForwardedWrite.forwardMutation((Mutation) mutation,
plan, rs, requestTime);
}
if (logger.isTraceEnabled())
logger.trace("Local tracked request {} {}", mutation, plan);
writeMetrics.localRequests.mark();
+
MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
- mutation = mutation.withMutationId(id);
- if (logger.isTraceEnabled())
- logger.trace("Write replication plan for mutation {}: live={},
pending={}, all={}",
- id, plan.live(), plan.pending(), plan.contacts());
+ if (mutation instanceof CounterMutation)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Write replication plan for counter mutation {}:
live={}, pending={}, all={}",
+ id, plan.live(), plan.pending(), plan.contacts());
+
+ TrackedWriteResponseHandler handler =
+
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null,
WriteType.COUNTER, null, requestTime), id);
+
+ Mutation result = ((CounterMutation)
mutation).applyCounterMutation(id);
Review Comment:
All in all the PR is not far from what needs to be done. The primary issue /
change that needs to be made is the context of execution of the counter
mutation itself. `applyCounterMutation()` should be submitted to
`Stage.COUNTER_MUTATION` thread pool for execution, instead of being done
in-place here. Similarly to how we execute regular mutations in
`Stage.MUTATION` via `applyMutationLocally()` method.
Then, once you have the resulting mutation to propagate, I suggest invoking
`sendToReplicasOnly()` from that runnable.
##########
src/java/org/apache/cassandra/db/CounterMutation.java:
##########
@@ -173,6 +188,12 @@ public Mutation applyCounterMutation() throws
WriteTimeoutException
resultBuilder.add(processModifications(upd));
Mutation result = resultBuilder.build();
+
+ // For tracked keyspaces, assign the mutation ID to the result
before
+ // calling result.apply() since applyInternalTracked() requires an
ID
+ if (mutationId != null && !mutationId.isNone())
+ result = result.withMutationId(mutationId);
Review Comment:
I suggest setting the correct id earlier, in the builder, instead of making
a mutation object with the wrong id, then discarding it, and creating a new
mutation with just the id changed. I think the cleanest way would be to pass
the correct mutation id to `PartitionUpdateCollector` constructor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]