This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new a8f2ce44a3 Add support for counters to mutation tracking
a8f2ce44a3 is described below

commit a8f2ce44a3837790ff4fd201e4a8b6ef70617830
Author: Aparna Naik <[email protected]>
AuthorDate: Mon Dec 22 13:04:02 2025 -0800

    Add support for counters to mutation tracking
    
    patch by Aparna Naik; reviewed by Aleksey Yeschenko for CASSANDRA-20959
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/CounterMutation.java   |  18 +-
 .../cassandra/db/CounterMutationVerbHandler.java   |  10 +
 .../cassandra/replication/ForwardedWrite.java      | 221 ++++++++++++-
 .../cassandra/replication/TrackedWriteRequest.java | 361 ++++++++++++++++-----
 .../org/apache/cassandra/service/StorageProxy.java |  70 +---
 .../test/TrackedCounterForwardingTest.java         |  83 +++++
 .../replication/TrackedCounterWriteTest.java       | 160 +++++++++
 8 files changed, 762 insertions(+), 162 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8346d679fb..3a7c72353d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 cep-45-mutation-tracking
+ * Add support for counters to mutation tracking (CASSANDRA-20959)
  * CEP-45: Query forwarding (CASSANDRA-20309)
  * Fix mutation tracking startup (CASSANDRA-20540)
  * Mutation tracking journal integration, read, and write path 
(CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index 5d6c604533..2f1ab4eabe 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -161,7 +161,23 @@ public class CounterMutation implements IMutation
      */
     public Mutation applyCounterMutation() throws WriteTimeoutException
     {
-        Mutation.PartitionUpdateCollector resultBuilder = new 
Mutation.PartitionUpdateCollector(id(), getKeyspaceName(), key());
+        return applyCounterMutation(null);
+    }
+
+    /**
+     * Applies the counter mutation with an optional mutation ID for tracked 
keyspaces.
+     *
+     * For tracked keyspaces, the mutation ID is assigned to the concrete 
result BEFORE
+     * it is applied, ensuring the concrete counter values (not the operation) 
are
+     * journaled and tracked with the ID.
+     *
+     * @param mutationId the mutation ID to assign to the concrete result, or 
null for non-tracked
+     * @return the applied resulting Mutation (with ID if provided)
+     */
+    public Mutation applyCounterMutation(MutationId mutationId) throws 
WriteTimeoutException
+    {
+        MutationId idToUse = (mutationId != null && !mutationId.isNone()) ? 
mutationId : id();
+        Mutation.PartitionUpdateCollector resultBuilder = new 
Mutation.PartitionUpdateCollector(idToUse, getKeyspaceName(), key());
         Keyspace keyspace = Keyspace.open(getKeyspaceName());
 
         List<Lock> locks = new ArrayList<>();
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d0897afdd5..6d12551c6b 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.transport.Dispatcher;
 
@@ -38,6 +39,15 @@ public class CounterMutationVerbHandler extends 
AbstractMutationVerbHandler<Coun
         final CounterMutation cm = message.payload;
         logger.trace("Applying forwarded {}", cm);
 
+        Keyspace keyspace = Keyspace.open(cm.getKeyspaceName());
+
+        if (keyspace.getMetadata().useMutationTracking())
+        {
+            logger.trace("Applying tracked forwarded counter mutation {}", cm);
+            ForwardedWrite.applyForwardedCounterMutation(cm, message);
+            return;
+        }
+
         String localDataCenter = 
DatabaseDescriptor.getLocator().local().datacenter;
         // We should not wait for the result of the write in this thread,
         // otherwise we could have a distributed deadlock between replicas
diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java 
b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
index e5e2bf8d8b..ba4aecb002 100644
--- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
+++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
@@ -59,9 +59,98 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
- * For a forwarded write there are 2 nodes involved in coordination, a 
coordinator and a leader. The coordinator is the
- * node that the client is communicating with, and the leader is the mutation 
replica that is handling the mutation
+ * Handles tracked writes where the coordinator is NOT a replica for the write.
+ *
+ * <p>For a forwarded write there are 2 nodes involved in coordination: a 
coordinator and a leader. The coordinator is
+ * the node that the client is communicating with, and the leader is the 
mutation replica that is handling the mutation
  * tracking for that write.
+ *
+ * <h2>Request/Response Flow</h2>
+ *
+ * <h3>Regular Writes (Mutation)</h3>
+ *
+ * <pre>
+ * Client                Coordinator           Leader Replica        Other 
Replicas
+ *   |                        |                       |                     |
+ *   |---Write Request------->|                       |                     |
+ *   |                        |                       |                     |
+ *   |                        |--FORWARD_WRITE_REQ--->|                     |
+ *   |                        |   (Mutation w/o ID)   |                     |
+ *   |                        |                       |                     |
+ *   |                        |                       |--Assign MutationId  |
+ *   |                        |                       |                     |
+ *   |                        |                       |--Apply locally----->|
+ *   |                        |                       |   (with ID)         |
+ *   |                        |                       |                     |
+ *   |                        |                       |--MUTATION_REQ------>|
+ *   |                        |                       |   (Mutation w/ ID + |
+ *   |                        |                       |    CoordinatorAck)  |
+ *   |                        |                       |                     |
+ *   |                        |                       |                     
|--Apply locally
+ *   |                        |                       |                     |
+ *   |                        |<------MUTATION_RSP (for CL)-----------------|
+ *   |                        |                       |<-MUTATION_RSP 
(tracking)-|
+ *   |<--Write Response-------|                       |                     |
+ *   |  (when CL satisfied)   |                       |                     |
+ *   |                        |                       |--Mark witnessed---->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ *   <li>Coordinator selects a leader replica based on proximity and 
liveness</li>
+ *   <li>Coordinator sends mutation WITHOUT an ID to the leader</li>
+ *   <li>Leader assigns a MutationId using {@link 
MutationTrackingService#nextMutationId}</li>
+ *   <li>Leader applies mutation locally and forwards to other replicas</li>
+ *   <li>All replicas respond to coordinator for consistency level (using 
CoordinatorAckInfo)</li>
+ *   <li>Other replicas also respond to leader for mutation 
tracking/witnessing</li>
+ *   <li>Coordinator waits for CL responses before responding to client</li>
+ * </ul>
+ *
+ * <h3>Counter Writes (CounterMutation)</h3>
+ *
+ * <pre>
+ * Client                Coordinator           Leader Replica        Other 
Replicas
+ *   |                        |                       |                     |
+ *   |---Counter Write------->|                       |                     |
+ *   |                        |                       |                     |
+ *   |                        |--COUNTER_MUTATION_REQ>|                     |
+ *   |                        |  (CounterMutation w/o |                     |
+ *   |                        |   ID)                 |                     |
+ *   |                        |                       |                     |
+ *   |                        |                       |--Assign MutationId  |
+ *   |                        |                       |                     |
+ *   |                        |                       |--Apply counter----->|
+ *   |                        |                       |  mutation (converts |
+ *   |                        |                       |  CounterMutation to |
+ *   |                        |                       |  Mutation w/ ID)    |
+ *   |                        |                       |                     |
+ *   |                        |                       |--MUTATION_REQ------>|
+ *   |                        |                       |  (Mutation result + |
+ *   |                        |                       |   CoordinatorAck)   |
+ *   |                        |                       |                     |
+ *   |                        |                       |                     
|--Apply locally
+ *   |                        |                       |                     |
+ *   |                        |<------MUTATION_RSP (for CL)-----------------|
+ *   |                        |                       |<-MUTATION_RSP 
(tracking)-|
+ *   |<--Write Response-------|                       |                     |
+ *   |  (when CL satisfied)   |                       |                     |
+ *   |                        |                       |--Mark witnessed---->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ *   <li>Coordinator selects a counter leader replica (prefers local DC)</li>
+ *   <li>Coordinator sends CounterMutation WITHOUT an ID to the leader</li>
+ *   <li>Leader assigns a MutationId using {@link 
MutationTrackingService#nextMutationId}</li>
+ *   <li>Leader applies counter mutation locally, which converts 
CounterMutation to a regular Mutation</li>
+ *   <li>Leader forwards the resulting Mutation (NOT CounterMutation) to other 
replicas</li>
+ *   <li>All replicas respond to coordinator for consistency level (using 
CoordinatorAckInfo)</li>
+ *   <li>Other replicas also respond to leader for mutation 
tracking/witnessing</li>
+ *   <li>Coordinator waits for CL responses before responding to client</li>
+ * </ul>
+ *
+ * @see TrackedWriteRequest for the flow when coordinator IS a replica
+ * @see MutationTrackingService for mutation ID assignment and witnessing
  */
 public class ForwardedWrite
 {
@@ -173,11 +262,8 @@ public class ForwardedWrite
                 {
                     if (remoteDCReplicas == null)
                         remoteDCReplicas = new HashMap<>();
-
-                    List<Replica> messages = remoteDCReplicas.get(dc);
-                    if (messages == null)
-                        messages = remoteDCReplicas.computeIfAbsent(dc, ignore 
-> new ArrayList<>(3)); // most DCs will have <= 3 replicas
-                    messages.add(replica);
+                    remoteDCReplicas.computeIfAbsent(dc, ignore -> new 
ArrayList<>(3)) // most DCs will have <= 3 replicas
+                                    .add(replica);
                 }
             }
 
@@ -241,6 +327,127 @@ public class ForwardedWrite
         return handler;
     }
 
+    /**
+     * Forward a tracked counter mutation to a replica leader for processing.
+     * The leader will apply the counter mutation, assign a mutation ID, and 
replicate to other replicas.
+     */
+    public static AbstractWriteResponseHandler<Object> 
forwardCounterMutation(CounterMutation counterMutation,
+                                                                              
ReplicaPlan.ForWrite plan,
+                                                                              
AbstractReplicationStrategy strategy,
+                                                                              
Dispatcher.RequestTime requestTime)
+    {
+        Preconditions.checkArgument(counterMutation.id().isNone(), 
"CounterMutation should not have an ID when forwarding");
+
+        ClusterMetadata cm = ClusterMetadata.current();
+        String localDataCenter = 
DatabaseDescriptor.getLocator().local().datacenter;
+
+        // Find the leader replica - prefer local DC replicas for counters
+        Replica leader;
+        try
+        {
+            leader = ReplicaPlans.findCounterLeaderReplica(cm, 
counterMutation.getKeyspaceName(),
+                                                           
counterMutation.key(),
+                                                           localDataCenter,
+                                                           
counterMutation.consistency());
+        }
+        catch (Exception e)
+        {
+            logger.error("Failed to find counter leader replica for tracked 
write", e);
+            throw e;
+        }
+
+        Preconditions.checkState(!leader.isSelf(), "Leader should not be self 
when forwarding counter mutation");
+        logger.trace("Forwarding tracked counter mutation to leader replica 
{}", leader);
+
+        // Create response handler for all replicas
+        AbstractWriteResponseHandler<Object> handler = 
strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, 
requestTime);
+
+        // Add callbacks for all live replicas to respond directly to 
coordinator
+        Message<CounterMutation> forwardMessage = 
Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation, 
requestTime);
+        for (Replica replica : plan.contacts())
+        {
+            if (plan.isAlive(replica))
+            {
+                logger.trace("Adding forwarding callback for tracked counter 
response from {} id {}", replica, forwardMessage.id());
+                
MessagingService.instance().callbacks.addWithExpiration(handler, 
forwardMessage, replica);
+            }
+            else
+            {
+                handler.expired();
+            }
+        }
+
+        // Send the counter mutation to the leader
+        MessagingService.instance().send(forwardMessage, leader.endpoint());
+
+        return handler;
+    }
+
+    /**
+     * Forward a mutation to a replica leader for processing.
+     * Dispatches to the appropriate method based on mutation type.
+     *
+     * @param mutation    the mutation to forward (can be Mutation or 
CounterMutation)
+     * @param plan        the replica plan
+     * @param strategy    the replication strategy
+     * @param requestTime the request time
+     * @return the write response handler
+     */
+    public static AbstractWriteResponseHandler<Object> forward(IMutation 
mutation,
+                                                               
ReplicaPlan.ForWrite plan,
+                                                               
AbstractReplicationStrategy strategy,
+                                                               
Dispatcher.RequestTime requestTime)
+    {
+        if (mutation instanceof CounterMutation)
+            return forwardCounterMutation((CounterMutation) mutation, plan, 
strategy, requestTime);
+        else
+            return forwardMutation((Mutation) mutation, plan, strategy, 
requestTime);
+    }
+
+    /**
+     * Apply a forwarded tracked counter mutation on the leader replica.
+     * Called by CounterMutationVerbHandler when receiving a forwarded counter 
write.
+     * <p>
+     * This method:
+     * 1. Creates CoordinatorAckInfo from the incoming message
+     * 2. Creates a LeaderCallback to track responses from replicas
+     * 3. Applies counter mutation locally with generated mutation ID
+     * 4. Forwards result (Mutation not CounterMutation) to other replicas 
with CoordinatorAckInfo and LeaderCallback
+     * 5. Sends leader's response back to coordinator
+     *
+     * @param counterMutation the counter mutation to apply
+     * @param message the original message (contains coordinator address and 
message ID)
+     */
+    public static void applyForwardedCounterMutation(CounterMutation 
counterMutation, Message<CounterMutation> message)
+    {
+        CoordinatorAckInfo coordinatorAckInfo = 
CoordinatorAckInfo.toCoordinator(message.from(), message.id());
+
+        String keyspaceName = counterMutation.getKeyspaceName();
+        Token token = counterMutation.key().getToken();
+        Keyspace ks = Keyspace.open(keyspaceName);
+        ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks, 
counterMutation.consistency(), token, ReplicaPlans.writeAll);
+
+        MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+
+        logger.trace("Forwarded counter mutation {}: applying locally with ID 
and forwarding to other replicas", id);
+
+        // Create LeaderCallback to track when replicas respond, allowing the 
leader
+        // to mark the mutation ID as witnessed on each replica proactively
+        LeaderCallback leaderCallback = new LeaderCallback(id, 
coordinatorAckInfo);
+
+        // Apply counter mutation with ID to get result
+        Mutation result = counterMutation.applyCounterMutation(id);
+
+        // Apply locally using the leader callback
+        TrackedWriteRequest.applyMutationLocally(result, leaderCallback);
+
+        // Send result to other replicas with CoordinatorAckInfo and 
LeaderCallback
+        // Replicas will respond to both the leader (for witnessing) and the 
coordinator (for CL)
+        TrackedWriteRequest.sendToReplicas(result, plan, leaderCallback, 
coordinatorAckInfo);
+
+        logger.trace("Tracked counter mutation {} processed, local application 
and replication initiated", id);
+    }
+
     public static final IVersionedSerializer<MutationRequest> serializer = new 
IVersionedSerializer<>()
     {
         @Override
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java 
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index b45eae9f74..e47b8766e6 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.concurrent.DebuggableTask;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
@@ -64,8 +65,93 @@ import org.apache.cassandra.utils.MonotonicClock;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
+import static org.apache.cassandra.net.Verb.COUNTER_MUTATION_REQ;
 import static org.apache.cassandra.net.Verb.MUTATION_REQ;
 
+/**
+ * Handles tracked writes where the coordinator IS a replica for the write.
+ *
+ * <p>When the coordinator is a replica, it acts as the leader and directly 
assigns the MutationId,
+ * applies the mutation locally, and forwards to other replicas. This avoids 
the extra network hop
+ * compared to {@link ForwardedWrite}.
+ *
+ * <h2>Request/Response Flow</h2>
+ *
+ * <h3>Regular Writes (Mutation)</h3>
+ *
+ * <pre>
+ * Client            Coordinator/Leader         Other Replicas
+ *   |                      |                          |
+ *   |---Write Request----->|                          |
+ *   |                      |                          |
+ *   |                      |--Assign MutationId       |
+ *   |                      |                          |
+ *   |                      |--Apply locally---------->|
+ *   |                      |   (with ID)              |
+ *   |                      |                          |
+ *   |                      |--MUTATION_REQ----------->|
+ *   |                      |   (Mutation w/ ID)       |
+ *   |                      |                          |
+ *   |                      |                          |--Apply locally
+ *   |                      |                          |
+ *   |                      |<------MUTATION_RSP-------|
+ *   |                      |                          |
+ *   |<--Write Response-----|                          |
+ *   |  (when CL satisfied) |                          |
+ *   |                      |--Mark witnessed--------->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ *   <li>Coordinator is a replica, so it acts as the leader</li>
+ *   <li>Coordinator assigns a MutationId using {@link 
MutationTrackingService#nextMutationId}</li>
+ *   <li>Coordinator applies mutation locally before sending to other 
replicas</li>
+ *   <li>Coordinator sends mutation WITH ID to other replicas</li>
+ *   <li>Replicas respond directly to coordinator for consistency level</li>
+ *   <li>Coordinator tracks witnessing for all replicas including itself</li>
+ *   <li>Client response sent when CL is satisfied</li>
+ * </ul>
+ *
+ * <h3>Counter Writes (CounterMutation)</h3>
+ *
+ * <pre>
+ * Client            Coordinator/Leader         Other Replicas
+ *   |                      |                          |
+ *   |---Counter Write----->|                          |
+ *   |                      |                          |
+ *   |                      |--Assign MutationId       |
+ *   |                      |                          |
+ *   |                      |--Apply counter---------->|
+ *   |                      |  mutation (converts      |
+ *   |                      |  CounterMutation to      |
+ *   |                      |  Mutation w/ ID)         |
+ *   |                      |                          |
+ *   |                      |--MUTATION_REQ----------->|
+ *   |                      |  (Mutation result)       |
+ *   |                      |                          |
+ *   |                      |                          |--Apply locally
+ *   |                      |                          |
+ *   |                      |<------MUTATION_RSP-------|
+ *   |                      |                          |
+ *   |<--Write Response-----|                          |
+ *   |  (when CL satisfied) |                          |
+ *   |                      |--Mark witnessed--------->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ *   <li>Coordinator is a replica, so it acts as the leader</li>
+ *   <li>Coordinator assigns a MutationId using {@link 
MutationTrackingService#nextMutationId}</li>
+ *   <li>Coordinator applies counter mutation locally, which converts 
CounterMutation to regular Mutation</li>
+ *   <li>Coordinator sends the resulting Mutation (NOT CounterMutation) to 
other replicas</li>
+ *   <li>Replicas respond directly to coordinator for consistency level</li>
+ *   <li>Coordinator tracks witnessing for all replicas including itself</li>
+ *   <li>Client response sent when CL is satisfied</li>
+ * </ul>
+ *
+ * @see ForwardedWrite for the flow when coordinator is NOT a replica
+ * @see MutationTrackingService for mutation ID assignment and witnessing
+ */
 public class TrackedWriteRequest
 {
     private static final Logger logger = 
LoggerFactory.getLogger(TrackedWriteRequest.class);
@@ -78,7 +164,7 @@ public class TrackedWriteRequest
      * @param requestTime object holding times when request got enqueued and 
started execution
      */
     public static AbstractWriteResponseHandler<?> perform(
-        Mutation mutation, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
+        IMutation mutation, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
     {
         Tracing.trace("Determining replicas for mutation");
 
@@ -92,33 +178,61 @@ public class TrackedWriteRequest
 
         if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) == null)
         {
-            if (logger.isTraceEnabled())
-                logger.trace("Remote tracked request {} {}", mutation, plan);
+            logger.trace("Remote tracked request {} {}", mutation, plan);
             writeMetrics.remoteRequests.mark();
-            return ForwardedWrite.forwardMutation(mutation, plan, rs, 
requestTime);
+            return ForwardedWrite.forward(mutation, plan, rs, requestTime);
         }
 
-        if (logger.isTraceEnabled())
-            logger.trace("Local tracked request {} {}", mutation, plan);
+        logger.trace("Local tracked request {} {}", mutation, plan);
         writeMetrics.localRequests.mark();
+
         MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
         mutation = mutation.withMutationId(id);
 
         if (logger.isTraceEnabled())
+        {
             logger.trace("Write replication plan for mutation {}: live={}, 
pending={}, all={}",
                          id, plan.live(), plan.pending(), plan.contacts());
+        }
 
-        TrackedWriteResponseHandler handler =
-            TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, 
null, WriteType.SIMPLE, null, requestTime), id);
-        applyLocallyAndSendToReplicas(mutation, plan, handler);
+        final TrackedWriteResponseHandler handler;
+        if (mutation instanceof CounterMutation)
+        {
+            handler = 
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, 
WriteType.COUNTER, null, requestTime), id);
+            applyCounterMutationLocally((CounterMutation) mutation, plan, 
handler);
+        }
+        else
+        {
+            handler = 
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, 
WriteType.SIMPLE, null, requestTime), id);
+            applyLocallyAndSendToReplicas((Mutation) mutation, plan, handler);
+        }
         return handler;
     }
 
     public static void applyLocallyAndSendToReplicas(Mutation mutation, 
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler)
     {
-        String localDataCenter = 
DatabaseDescriptor.getLocator().local().datacenter;
+        applyMutationLocally(mutation, handler);
+        sendToReplicas(mutation, plan, handler, null);
+    }
+
+    /**
+     * Sends a mutation to all replicas.
+     * Handles grouping replicas by DC, sending messages, and tracking remote 
replicas.
+     *
+     * @param mutation the mutation with assigned ID to send to replicas
+     * @param plan the replica plan
+     * @param handler the response handler (can be TrackedWriteResponseHandler 
or LeaderCallback)
+     * @param coordinatorAckInfo optional coordinator info for forwarded 
writes (null for local coordinator)
+     */
+    public static void sendToReplicas(Mutation mutation,
+                                      ReplicaPlan.ForWrite plan,
+                                      RequestCallback<NoPayload> handler,
+                                      ForwardedWrite.CoordinatorAckInfo 
coordinatorAckInfo)
+    {
+        Preconditions.checkArgument(handler instanceof 
TrackedWriteResponseHandler || handler instanceof ForwardedWrite.LeaderCallback,
+                                    "Handler must be 
TrackedWriteResponseHandler or LeaderCallback");
 
-        boolean applyLocally = false;
+        String localDataCenter = 
DatabaseDescriptor.getLocator().local().datacenter;
 
         // this DC replicas
         List<Replica> localDCReplicas = null;
@@ -126,7 +240,7 @@ public class TrackedWriteRequest
         // extra-DC, grouped by DC
         Map<String, List<Replica>> remoteDCReplicas = null;
 
-        // only need to create a Message for non-local writes
+        // create a Message for non-local writes
         Message<Mutation> message = null;
 
         // For performance, Mutation caches serialized buffers that are 
computed lazily in serializedBuffer(). That
@@ -138,28 +252,39 @@ public class TrackedWriteRequest
         // the current version, but it's just an optimization, and we're ok 
not optimizing for mixed-version clusters.
         Mutation.serializer.prepareSerializedBuffer(mutation, 
MessagingService.current_version);
 
+        // Extract request time from handler
+        Dispatcher.RequestTime requestTime = getRequestTime(handler);
+
+        boolean foundSelf = false;
         for (Replica destination : plan.contacts())
         {
             if (!plan.isAlive(destination))
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("Skipping dead replica {} for mutation {}", 
destination, mutation.id());
-                handler.expired(); // immediately mark the response as expired 
since the request will not be sent
+                logger.trace("Skipping dead replica {} for mutation {}", 
destination, mutation.id());
+                // Only call expired() for AbstractWriteResponseHandler (not 
for LeaderCallback)
+                if (handler instanceof AbstractWriteResponseHandler)
+                    ((AbstractWriteResponseHandler<?>) handler).expired(); // 
immediately mark the response as expired since the request will not be sent
                 continue;
             }
 
             if (destination.isSelf())
             {
-                applyLocally = true;
+                foundSelf = true; // Mutation was already applied locally
                 continue;
             }
 
             if (message == null)
             {
-                message = Message.builder(MUTATION_REQ, mutation)
-                                 .withRequestTime(handler.getRequestTime())
-                                 .withFlag(MessageFlag.CALL_BACK_ON_FAILURE)
-                                 .build();
+                Message.Builder<Mutation> builder = 
Message.builder(MUTATION_REQ, mutation)
+                                                           
.withRequestTime(requestTime)
+                                                           
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE);
+
+                // If this is a forwarded write, include coordinator ack info 
so replicas
+                // know to respond to the original coordinator, not this leader
+                if (coordinatorAckInfo != null)
+                    builder.withParam(ParamType.COORDINATOR_ACK_INFO, 
coordinatorAckInfo);
+
+                message = builder.build();
             }
 
             String dc = 
DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter;
@@ -174,16 +299,12 @@ public class TrackedWriteRequest
             {
                 if (remoteDCReplicas == null)
                     remoteDCReplicas = new HashMap<>();
-
-                List<Replica> replicas = remoteDCReplicas.get(dc);
-                if (replicas == null)
-                    replicas = remoteDCReplicas.computeIfAbsent(dc, ignore -> 
new ArrayList<>(3)); // most DCs will have <= 3 replicas
-                replicas.add(destination);
+                remoteDCReplicas.computeIfAbsent(dc, ignore -> new 
ArrayList<>(3)) // most DCs will have <= 3 replicas
+                                .add(destination);
             }
         }
 
-        Preconditions.checkState(applyLocally); // the coordinator is always a 
replica
-        applyMutationLocally(mutation, handler);
+        Preconditions.checkState(foundSelf, "Coordinator must be a replica");
 
         IntHashSet remoteReplicas = null;
         if (localDCReplicas != null || remoteDCReplicas != null)
@@ -193,9 +314,13 @@ public class TrackedWriteRequest
         {
             for (Replica replica : localDCReplicas)
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("Sending mutation {} to local replica {}", 
mutation.id(), replica);
-                MessagingService.instance().sendWriteWithCallback(message, 
replica, handler);
+                logger.trace("Sending mutation {} to local replica {}", 
mutation.id(), replica);
+                // Use appropriate send method based on handler type
+                if (handler instanceof AbstractWriteResponseHandler)
+                    MessagingService.instance().sendWriteWithCallback(message, 
replica, (AbstractWriteResponseHandler<?>) handler);
+                else
+                    MessagingService.instance().sendWithCallback(message, 
replica.endpoint(), handler);
+
                 
remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id());
             }
         }
@@ -205,20 +330,69 @@ public class TrackedWriteRequest
             // for each datacenter, send the message to one node to relay the 
write to other replicas
             for (List<Replica> dcReplicas : remoteDCReplicas.values())
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("Sending mutation {} to remote dc replicas 
{}", mutation.id(), dcReplicas);
-                sendMessagesToRemoteDC(message, 
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, null);
+                logger.trace("Sending mutation {} to remote dc replicas {}", 
mutation.id(), dcReplicas);
+                sendMessagesToRemoteDC(message, 
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, 
coordinatorAckInfo);
                 for (Replica replica : dcReplicas)
                     
remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id());
             }
         }
 
         if (remoteReplicas != null)
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Sending mutation {} to remote replicas {}", 
mutation.id(), remoteReplicas);
             MutationTrackingService.instance.sentWriteRequest(mutation, 
remoteReplicas);
+    }
+
+    /*
+     * Send the message to the first replica of targets, and have it forward 
the message to others in its DC
+     */
+    static void sendMessagesToRemoteDC(Message<? extends IMutation> message,
+                                       EndpointsForToken targets,
+                                       RequestCallback<NoPayload> handler,
+                                       ForwardedWrite.CoordinatorAckInfo ackTo)
+    {
+        final Replica target;
+
+        if (targets.size() > 1)
+        {
+            target = pickReplica(targets);
+            EndpointsForToken forwardToReplicas = targets.filter(r -> r != 
target, targets.size());
+
+            for (Replica replica : forwardToReplicas)
+            {
+                if (handler instanceof TrackedWriteResponseHandler)
+                    
MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler)
 handler, message, replica);
+                else if (handler instanceof ForwardedWrite.LeaderCallback)
+                    
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica.endpoint());
+                else
+                    throw new IllegalStateException();
+                logger.trace("Adding FWD message to {}@{}", message.id(), 
replica);
+            }
+
+            // starting with 4.0, use the same message id for all replicas
+            long[] messageIds = new long[forwardToReplicas.size()];
+            Arrays.fill(messageIds, message.id());
+
+            message = message.withForwardTo(new 
ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
+        }
+        else
+        {
+            target = targets.get(0);
         }
+        if (ackTo != null)
+            message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo);
+
+        Tracing.trace("Sending mutation to remote replica {}", target);
+        if (handler instanceof ForwardedWrite.LeaderCallback)
+            
MessagingService.instance().sendForwardedWriteWithCallback(message, target, 
(ForwardedWrite.LeaderCallback) handler);
+        else
+            MessagingService.instance().sendWriteWithCallback(message, target, 
(AbstractWriteResponseHandler<?>) handler);
+        logger.trace("Sending message to {}@{}", message.id(), target);
+    }
+
+    private static Replica pickReplica(EndpointsForToken targets)
+    {
+        EndpointsForToken healthy = targets.filter(r -> 
DynamicEndpointSnitch.getSeverity(r.endpoint()) == 0);
+        EndpointsForToken select = healthy.isEmpty() ? targets : healthy;
+        return select.get(ThreadLocalRandom.current().nextInt(0, 
select.size()));
     }
 
     static void applyMutationLocally(Mutation mutation, 
RequestCallback<NoPayload> handler)
@@ -227,6 +401,13 @@ public class TrackedWriteRequest
         Stage.MUTATION.maybeExecuteImmediately(new 
LocalMutationRunnable(mutation, handler));
     }
 
+    static void applyCounterMutationLocally(CounterMutation counterMutation,
+                                            ReplicaPlan.ForWrite plan,
+                                            TrackedWriteResponseHandler 
handler)
+    {
+        Stage.COUNTER_MUTATION.maybeExecuteImmediately(new 
LocalCounterMutationRunnable(counterMutation, plan, handler));
+    }
+
     private static class LocalMutationRunnable implements 
DebuggableTask.RunnableDebuggableTask
     {
         private final Mutation mutation;
@@ -239,20 +420,11 @@ public class TrackedWriteRequest
             this.handler = handler;
         }
 
-        private Dispatcher.RequestTime getRequestTime()
-        {
-            if (handler instanceof TrackedWriteResponseHandler)
-                return ((TrackedWriteResponseHandler) 
handler).getRequestTime();
-            if (handler instanceof ForwardedWrite.LeaderCallback)
-                return ((ForwardedWrite.LeaderCallback) 
handler).getRequestTime();
-            throw new IllegalStateException();
-        }
-
         @Override
         public final void run()
         {
             long now = MonotonicClock.Global.approxTime.now();
-            long deadline = 
getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
+            long deadline = 
getRequestTime(handler).computeDeadline(MUTATION_REQ.expiresAfterNanos());
 
             if (now > deadline)
             {
@@ -277,13 +449,13 @@ public class TrackedWriteRequest
         @Override
         public long creationTimeNanos()
         {
-            return getRequestTime().enqueuedAtNanos();
+            return getRequestTime(handler).enqueuedAtNanos();
         }
 
         @Override
         public long startTimeNanos()
         {
-            return getRequestTime().startedAtNanos();
+            return getRequestTime(handler).startedAtNanos();
         }
 
         @Override
@@ -295,57 +467,76 @@ public class TrackedWriteRequest
         }
     }
 
-    /*
-     * Send the message to the first replica of targets, and have it forward 
the message to others in its DC
-     */
-    static void sendMessagesToRemoteDC(Message<? extends IMutation> message,
-                                       EndpointsForToken targets,
-                                       RequestCallback<NoPayload> handler,
-                                       ForwardedWrite.CoordinatorAckInfo ackTo)
+    private static class LocalCounterMutationRunnable implements 
DebuggableTask.RunnableDebuggableTask
     {
-        final Replica target;
+        private final CounterMutation counterMutation;
+        private final ReplicaPlan.ForWrite plan;
+        private final TrackedWriteResponseHandler handler;
 
-        if (targets.size() > 1)
+        LocalCounterMutationRunnable(CounterMutation counterMutation, 
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler)
         {
-            target = pickReplica(targets);
-            EndpointsForToken forwardToReplicas = targets.filter(r -> r != 
target, targets.size());
+            this.counterMutation = counterMutation;
+            this.plan = plan;
+            this.handler = handler;
+        }
 
-            for (Replica replica : forwardToReplicas)
+        private Dispatcher.RequestTime getReqestTime()
+        {
+            return handler.getRequestTime();
+        }
+
+        @Override
+        public void run()
+        {
+            long now = MonotonicClock.Global.approxTime.now();
+            long deadline = 
getReqestTime().computeDeadline(COUNTER_MUTATION_REQ.expiresAfterNanos());
+
+            if (now > deadline)
             {
-                if (handler instanceof TrackedWriteResponseHandler)
-                    
MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler)
 handler, message, replica);
-                else if (handler instanceof ForwardedWrite.LeaderCallback)
-                    
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica.endpoint());
-                else
-                    throw new IllegalStateException();
-                logger.trace("Adding FWD message to {}@{}", message.id(), 
replica);
+                long timeTakenNanos = now - startTimeNanos();
+                
MessagingService.instance().metrics.recordSelfDroppedMessage(COUNTER_MUTATION_REQ,
 timeTakenNanos, NANOSECONDS);
+                return;
             }
 
-            // starting with 4.0, use the same message id for all replicas
-            long[] messageIds = new long[forwardToReplicas.size()];
-            Arrays.fill(messageIds, message.id());
+            try
+            {
+                Mutation result = 
counterMutation.applyCounterMutation(counterMutation.id());
+                handler.onResponse(null);
+                sendToReplicas(result, plan, handler, null);
+            }
+            catch (Exception ex)
+            {
+                if(!(ex instanceof WriteTimeoutException))
+                    logger.error("Failed to apply counter mutation locally:  
", ex);
+                handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailure.forException(ex));
+            }
+        }
 
-            message = message.withForwardTo(new 
ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
+        @Override
+        public long creationTimeNanos()
+        {
+            return getReqestTime().enqueuedAtNanos();
         }
-        else
+
+        @Override
+        public long startTimeNanos()
         {
-            target = targets.get(0);
+            return getReqestTime().startedAtNanos();
         }
-        if (ackTo != null)
-            message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo);
 
-        Tracing.trace("Sending mutation to remote replica {}", target);
-        if (handler instanceof ForwardedWrite.LeaderCallback)
-            
MessagingService.instance().sendForwardedWriteWithCallback(message, target, 
(ForwardedWrite.LeaderCallback) handler);
-        else
-            MessagingService.instance().sendWriteWithCallback(message, target, 
(AbstractWriteResponseHandler<?>) handler);
-        logger.trace("Sending message to {}@{}", message.id(), target);
+        @Override
+        public String description()
+        {
+            return counterMutation.toString();
+        }
     }
 
-    private static Replica pickReplica(EndpointsForToken targets)
+    private static Dispatcher.RequestTime getRequestTime(RequestCallback<?> 
callback)
     {
-        EndpointsForToken healthy = targets.filter(r -> 
DynamicEndpointSnitch.getSeverity(r.endpoint()) == 0);
-        EndpointsForToken select = healthy.isEmpty() ? targets : healthy;
-        return select.get(ThreadLocalRandom.current().nextInt(0, 
select.size()));
+        if (callback instanceof TrackedWriteResponseHandler)
+            return ((TrackedWriteResponseHandler) callback).getRequestTime();
+        if (callback instanceof ForwardedWrite.LeaderCallback)
+            return ((ForwardedWrite.LeaderCallback) callback).getRequestTime();
+        throw new IllegalStateException();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index cde6ad1d58..2324edce54 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -190,7 +190,6 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 import static accord.primitives.Txn.Kind.Read;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.concat;
-import static java.util.Collections.singleton;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
@@ -958,71 +957,6 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
-    /**
-     * Performs a coordinated write with mutation tracking.
-     * Assumes that local coordinator is a replica (forwarding implementation 
pending).
-     *
-     * @param mutation
-     * @param consistencyLevel
-     * @param requestTime
-     */
-    public static void mutateWithTracking(Mutation mutation, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
-    {
-        try
-        {
-            TrackedWriteRequest.perform(mutation, consistencyLevel, 
requestTime).get();
-        }
-        catch (WriteTimeoutException|WriteFailureException ex)
-        {
-            if (consistencyLevel == ConsistencyLevel.ANY)
-            {
-                // TODO (expected): what exactly?
-            }
-            else
-            {
-                if (ex instanceof WriteFailureException)
-                {
-                    writeMetrics.failures.mark();
-                    writeMetricsForLevel(consistencyLevel).failures.mark();
-                    WriteFailureException fe = (WriteFailureException)ex;
-                    Tracing.trace("Write failure; received {} of {} required 
replies, failed {} requests",
-                                  fe.received, fe.blockFor, 
fe.failureReasonByEndpoint.size());
-                }
-                else
-                {
-                    writeMetrics.timeouts.mark();
-                    writeMetricsForLevel(consistencyLevel).timeouts.mark();
-                    WriteTimeoutException te = (WriteTimeoutException)ex;
-                    Tracing.trace("Write timeout; received {} of {} required 
replies", te.received, te.blockFor);
-                }
-                throw ex;
-            }
-        }
-        catch (UnavailableException e)
-        {
-            writeMetrics.unavailables.mark();
-            writeMetricsForLevel(consistencyLevel).unavailables.mark();
-            Tracing.trace("Unavailable");
-            throw e;
-        }
-        catch (OverloadedException e)
-        {
-            writeMetrics.unavailables.mark();
-            writeMetricsForLevel(consistencyLevel).unavailables.mark();
-            Tracing.trace("Overloaded");
-            throw e;
-        }
-        finally
-        {
-            // We track latency based on request processing time, since the 
amount of time that request spends in the queue
-            // is not a representative metric of replica performance.
-            long latency = nanoTime() - requestTime.startedAtNanos();
-            writeMetrics.addNano(latency);
-            writeMetricsForLevel(consistencyLevel).addNano(latency);
-            updateCoordinatorWriteLatencyTableMetric(singleton(mutation), 
latency);
-        }
-    }
-
     /**
      * Use this method to have these Mutations applied
      * across all replicas. This method will take care
@@ -1322,8 +1256,6 @@ public class StorageProxy implements StorageProxyMBean
         boolean isTracked = 
Schema.instance.getKeyspaceMetadata(mutations.get(0).getKeyspaceName()).params.replicationType.isTracked();
         if (isTracked)
         {
-            if (mutations.stream().anyMatch(m -> m instanceof CounterMutation))
-                throw new InvalidRequestException("Mutation tracking is 
currently unsupported with counters");
             if (augmented != null)
                 throw new InvalidRequestException("Mutation tracking is 
currently unsupported with triggers");
             if (mutateAtomically)
@@ -1366,7 +1298,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     for (IMutation trackedMutation : trackedMutations)
                     {
-                        
trackedHandlers.add(TrackedWriteRequest.perform((Mutation) trackedMutation, 
consistencyLevel, requestTime));
+                        
trackedHandlers.add(TrackedWriteRequest.perform(trackedMutation, 
consistencyLevel, requestTime));
                     }
                 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TrackedCounterForwardingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/TrackedCounterForwardingTest.java
new file mode 100644
index 0000000000..656b3be4f5
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/TrackedCounterForwardingTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * Tests the case where a coordinator that is NOT a replica MUST forward
+ * the counter write to a replica leader.
+ *
+ * With 6 nodes and RF=3, we can pick a coordinator that is definitely
+ * not a replica for a specific partition.
+ */
+public class TrackedCounterForwardingTest extends TestBaseImpl
+{
+    @Test
+    public void testForwardedTrackedCounterWrites() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(6)
+                                      .withConfig(c -> c.with(GOSSIP, 
NATIVE_PROTOCOL))
+                                      .start())
+        {
+            cluster.schemaChange("CREATE KEYSPACE k WITH replication = " +
+                                 "{'class': 'SimpleStrategy', 
'replication_factor': 3} AND replication_type='tracked'");
+
+            cluster.schemaChange("CREATE TABLE k.counters (pk int PRIMARY KEY, 
count counter)");
+
+            ConsistencyLevel cl = ConsistencyLevel.QUORUM;
+
+            // Test counter writes from all nodes
+            // Some will be local (coordinator is replica), some will be 
forwarded (coordinator not replica)
+            for (int coordinatorNode = 1; coordinatorNode <= 6; 
coordinatorNode++)
+            {
+                ICoordinator coordinator = 
cluster.coordinator(coordinatorNode);
+                int pk = coordinatorNode * 100;
+
+                // Increment
+                coordinator.execute("UPDATE k.counters SET count = count + 5 
WHERE pk = ?", cl, pk);
+                assertRows(coordinator.execute("SELECT count FROM k.counters 
WHERE pk = ?", cl, pk), row(5L));
+
+                // Increment again
+                coordinator.execute("UPDATE k.counters SET count = count + 3 
WHERE pk = ?", cl, pk);
+                assertRows(coordinator.execute("SELECT count FROM k.counters 
WHERE pk = ?", cl, pk), row(8L));
+            }
+
+            // Verify all nodes can read all counters
+            for (int node = 1; node <= 6; node++)
+            {
+                ICoordinator coordinator = cluster.coordinator(node);
+                for (int pk = 100; pk <= 600; pk += 100)
+                {
+                    assertRows(coordinator.execute("SELECT count FROM 
k.counters WHERE pk = ?", ConsistencyLevel.ONE, pk),
+                               row(8L));
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java 
b/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java
new file mode 100644
index 0000000000..4b49d52fc1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Unit tests for Token-Aware (Normal Path) Counter Mutations for Mutation 
Tracking
+ * In these tests, the coordinator IS a replica.
+ */
+public class TrackedCounterWriteTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TrackedCounterWriteTest.class);
+
+    private static final String KEYSPACE_TRACKED = "TrackedCounterTest";
+    private static final String CF_COUNTER = "Counter1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+
+        SchemaLoader.createKeyspace(KEYSPACE_TRACKED,
+                                    KeyspaceParams.simple(3, 
ReplicationType.tracked),
+                                    SchemaLoader.counterCFMD(KEYSPACE_TRACKED, 
CF_COUNTER));
+    }
+
+    /**
+     * Tests that counter writes are properly tracked when the coordinator is 
a replica.
+     * Verifies that:
+     * 1. CounterMutation initially has no ID before perform()
+     * 2. TrackedWriteRequest.perform() successfully processes the mutation
+     * 3. The counter value is correctly incremented
+     */
+    @Test
+    public void testTrackedCounterWrite_CoordinatorIsReplica() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE_TRACKED).getColumnFamilyStore(CF_COUNTER);
+        cfs.truncateBlocking();
+
+        ColumnMetadata cDef = 
cfs.metadata().getColumn(ByteBufferUtil.bytes("val"));
+
+        Mutation m = new RowUpdateBuilder(cfs.metadata(), 
System.currentTimeMillis(), "testkey1")
+                         .clustering("cc")
+                         .add("val", 5L)
+                         .build();
+
+        CounterMutation counterMutation = new CounterMutation(m, 
ConsistencyLevel.ONE);
+
+        assertTrue("CounterMutation should not have an ID before perform()",
+                    counterMutation.id().isNone());
+
+        AbstractWriteResponseHandler<?> handler = TrackedWriteRequest.perform(
+            counterMutation,
+            ConsistencyLevel.ONE,
+            Dispatcher.RequestTime.forImmediateExecution()
+        );
+        assertNotNull("Handler should not be null", handler);
+
+        Thread.sleep(100); // Waiting for async operations
+
+        Row row = 
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+        long counterValue = CounterContext.instance().total(row.getCell(cDef));
+        assertEquals("Counter should be incremented to 5", 5L, counterValue);
+    }
+
+    /**
+     * Tests that multiple counter updates on the same key accumulate 
correctly with tracking.
+     * Verifies that:
+     * 1. Multiple sequential increments/decrements are properly tracked
+     * 2. Counter values accumulate correctly across updates
+     */
+    @Test
+    public void testTrackedCounterWrite_MultipleIncrements() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE_TRACKED).getColumnFamilyStore(CF_COUNTER);
+        cfs.truncateBlocking();
+
+        ColumnMetadata cDef = 
cfs.metadata().getColumn(ByteBufferUtil.bytes("val"));
+
+        performCounterUpdate(cfs, "testkey2", 3L); // First increment (+3)
+
+        Row row = 
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+        long value = CounterContext.instance().total(row.getCell(cDef));
+        assertEquals("Counter should be 3", 3L, value);
+
+        performCounterUpdate(cfs, "testkey2", 7L); // Second increment (+7)
+
+        row = 
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+        value = CounterContext.instance().total(row.getCell(cDef));
+        assertEquals("Counter should be 10", 10L, value);
+
+        performCounterUpdate(cfs, "testkey2", -4L); // Decrement (-4)
+
+        row = 
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+        value = CounterContext.instance().total(row.getCell(cDef));
+        assertEquals("Counter should be 6", 6L, value);
+    }
+
+    /**
+     * Performs a counter update and waits for async replication to complete
+     */
+    private void performCounterUpdate(ColumnFamilyStore cfs, String key, long 
delta) throws Exception
+    {
+        Mutation m = new RowUpdateBuilder(cfs.metadata(), 
System.currentTimeMillis(), key)
+                         .clustering("cc")
+                         .add("val", delta)
+                         .build();
+
+        CounterMutation counterMutation = new CounterMutation(m, 
ConsistencyLevel.ONE);
+
+        TrackedWriteRequest.perform(
+            counterMutation,
+            ConsistencyLevel.ONE,
+            Dispatcher.RequestTime.forImmediateExecution()
+        );
+
+        Thread.sleep(50);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to