iamaleksey commented on code in PR #4541:
URL: https://github.com/apache/cassandra/pull/4541#discussion_r2665312483


##########
src/java/org/apache/cassandra/replication/TrackedWriteRequest.java:
##########
@@ -95,23 +96,45 @@ public static AbstractWriteResponseHandler<?> perform(
             if (logger.isTraceEnabled())
                 logger.trace("Remote tracked request {} {}", mutation, plan);
             writeMetrics.remoteRequests.mark();
-            return ForwardedWrite.forwardMutation(mutation, plan, rs, 
requestTime);
+
+            if (mutation instanceof CounterMutation)
+                return ForwardedWrite.forwardCounterMutation((CounterMutation) 
mutation, plan, rs, requestTime);
+            else
+                return ForwardedWrite.forwardMutation((Mutation) mutation, 
plan, rs, requestTime);
         }
 
         if (logger.isTraceEnabled())
             logger.trace("Local tracked request {} {}", mutation, plan);
         writeMetrics.localRequests.mark();
+
         MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
-        mutation = mutation.withMutationId(id);
 
-        if (logger.isTraceEnabled())
-            logger.trace("Write replication plan for mutation {}: live={}, 
pending={}, all={}",
-                         id, plan.live(), plan.pending(), plan.contacts());
+        if (mutation instanceof CounterMutation)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Write replication plan for counter mutation {}: 
live={}, pending={}, all={}",
+                             id, plan.live(), plan.pending(), plan.contacts());
+
+            TrackedWriteResponseHandler handler =
+                
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, 
WriteType.COUNTER, null, requestTime), id);
+
+            Mutation result = ((CounterMutation) 
mutation).applyCounterMutation(id);

Review Comment:
   All in all the PR is not far from what needs to be done. The primary issue / 
change that needs to be made is the context of execution of the counter 
mutation itself. `applyCounterMutation()` should be submitted to 
`Stage.COUNTER_MUTATION` thread pool for execution, instead of being done 
in-place here. Similarly to how we execute regular mutations in 
`Stage.MUTATION` via `applyMutationLocally()` method.
   
   Then, once you have the resulting mutation to propagate, I suggest invoking 
`sendToReplicasOnly()` from that runnable.



##########
src/java/org/apache/cassandra/db/CounterMutation.java:
##########
@@ -173,6 +188,12 @@ public Mutation applyCounterMutation() throws 
WriteTimeoutException
                 resultBuilder.add(processModifications(upd));
 
             Mutation result = resultBuilder.build();
+
+            // For tracked keyspaces, assign the mutation ID to the result 
before
+            // calling result.apply() since applyInternalTracked() requires an 
ID
+            if (mutationId != null && !mutationId.isNone())
+                result = result.withMutationId(mutationId);

Review Comment:
   I suggest setting the correct id earlier, in the builder, instead of making 
a mutation object with the wrong id, then discarding it, and creating a new 
mutation with just the id changed. I think the cleanest way would be to pass 
the correct mutation id to `PartitionUpdateCollector` constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to