This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new a8f2ce44a3 Add support for counters to mutation tracking
a8f2ce44a3 is described below
commit a8f2ce44a3837790ff4fd201e4a8b6ef70617830
Author: Aparna Naik <[email protected]>
AuthorDate: Mon Dec 22 13:04:02 2025 -0800
Add support for counters to mutation tracking
patch by Aparna Naik; reviewed by Aleksey Yeschenko for CASSANDRA-20959
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/CounterMutation.java | 18 +-
.../cassandra/db/CounterMutationVerbHandler.java | 10 +
.../cassandra/replication/ForwardedWrite.java | 221 ++++++++++++-
.../cassandra/replication/TrackedWriteRequest.java | 361 ++++++++++++++++-----
.../org/apache/cassandra/service/StorageProxy.java | 70 +---
.../test/TrackedCounterForwardingTest.java | 83 +++++
.../replication/TrackedCounterWriteTest.java | 160 +++++++++
8 files changed, 762 insertions(+), 162 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8346d679fb..3a7c72353d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
cep-45-mutation-tracking
+ * Add support for counters to mutation tracking (CASSANDRA-20959)
* CEP-45: Query forwarding (CASSANDRA-20309)
* Fix mutation tracking startup (CASSANDRA-20540)
* Mutation tracking journal integration, read, and write path
(CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java
b/src/java/org/apache/cassandra/db/CounterMutation.java
index 5d6c604533..2f1ab4eabe 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -161,7 +161,23 @@ public class CounterMutation implements IMutation
*/
public Mutation applyCounterMutation() throws WriteTimeoutException
{
- Mutation.PartitionUpdateCollector resultBuilder = new
Mutation.PartitionUpdateCollector(id(), getKeyspaceName(), key());
+ return applyCounterMutation(null);
+ }
+
+ /**
+ * Applies the counter mutation with an optional mutation ID for tracked
keyspaces.
+ *
+ * For tracked keyspaces, the mutation ID is assigned to the concrete
result BEFORE
+ * it is applied, ensuring the concrete counter values (not the operation)
are
+ * journaled and tracked with the ID.
+ *
+ * @param mutationId the mutation ID to assign to the concrete result, or
null for non-tracked
+ * @return the applied resulting Mutation (with ID if provided)
+ */
+ public Mutation applyCounterMutation(MutationId mutationId) throws
WriteTimeoutException
+ {
+ MutationId idToUse = (mutationId != null && !mutationId.isNone()) ?
mutationId : id();
+ Mutation.PartitionUpdateCollector resultBuilder = new
Mutation.PartitionUpdateCollector(idToUse, getKeyspaceName(), key());
Keyspace keyspace = Keyspace.open(getKeyspaceName());
List<Lock> locks = new ArrayList<>();
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d0897afdd5..6d12551c6b 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.transport.Dispatcher;
@@ -38,6 +39,15 @@ public class CounterMutationVerbHandler extends
AbstractMutationVerbHandler<Coun
final CounterMutation cm = message.payload;
logger.trace("Applying forwarded {}", cm);
+ Keyspace keyspace = Keyspace.open(cm.getKeyspaceName());
+
+ if (keyspace.getMetadata().useMutationTracking())
+ {
+ logger.trace("Applying tracked forwarded counter mutation {}", cm);
+ ForwardedWrite.applyForwardedCounterMutation(cm, message);
+ return;
+ }
+
String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
index e5e2bf8d8b..ba4aecb002 100644
--- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
+++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
@@ -59,9 +59,98 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
- * For a forwarded write there are 2 nodes involved in coordination, a
coordinator and a leader. The coordinator is the
- * node that the client is communicating with, and the leader is the mutation
replica that is handling the mutation
+ * Handles tracked writes where the coordinator is NOT a replica for the write.
+ *
+ * <p>For a forwarded write there are 2 nodes involved in coordination: a
coordinator and a leader. The coordinator is
+ * the node that the client is communicating with, and the leader is the
mutation replica that is handling the mutation
* tracking for that write.
+ *
+ * <h2>Request/Response Flow</h2>
+ *
+ * <h3>Regular Writes (Mutation)</h3>
+ *
+ * <pre>
+ * Client Coordinator Leader Replica Other
Replicas
+ * | | | |
+ * |---Write Request------->| | |
+ * | | | |
+ * | |--FORWARD_WRITE_REQ--->| |
+ * | | (Mutation w/o ID) | |
+ * | | | |
+ * | | |--Assign MutationId |
+ * | | | |
+ * | | |--Apply locally----->|
+ * | | | (with ID) |
+ * | | | |
+ * | | |--MUTATION_REQ------>|
+ * | | | (Mutation w/ ID + |
+ * | | | CoordinatorAck) |
+ * | | | |
+ * | | |
|--Apply locally
+ * | | | |
+ * | |<------MUTATION_RSP (for CL)-----------------|
+ * | | |<-MUTATION_RSP
(tracking)-|
+ * |<--Write Response-------| | |
+ * | (when CL satisfied) | | |
+ * | | |--Mark witnessed---->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ * <li>Coordinator selects a leader replica based on proximity and
liveness</li>
+ * <li>Coordinator sends mutation WITHOUT an ID to the leader</li>
+ * <li>Leader assigns a MutationId using {@link
MutationTrackingService#nextMutationId}</li>
+ * <li>Leader applies mutation locally and forwards to other replicas</li>
+ * <li>All replicas respond to coordinator for consistency level (using
CoordinatorAckInfo)</li>
+ * <li>Other replicas also respond to leader for mutation
tracking/witnessing</li>
+ * <li>Coordinator waits for CL responses before responding to client</li>
+ * </ul>
+ *
+ * <h3>Counter Writes (CounterMutation)</h3>
+ *
+ * <pre>
+ * Client Coordinator Leader Replica Other
Replicas
+ * | | | |
+ * |---Counter Write------->| | |
+ * | | | |
+ * | |--COUNTER_MUTATION_REQ>| |
+ * | | (CounterMutation w/o | |
+ * | | ID) | |
+ * | | | |
+ * | | |--Assign MutationId |
+ * | | | |
+ * | | |--Apply counter----->|
+ * | | | mutation (converts |
+ * | | | CounterMutation to |
+ * | | | Mutation w/ ID) |
+ * | | | |
+ * | | |--MUTATION_REQ------>|
+ * | | | (Mutation result + |
+ * | | | CoordinatorAck) |
+ * | | | |
+ * | | |
|--Apply locally
+ * | | | |
+ * | |<------MUTATION_RSP (for CL)-----------------|
+ * | | |<-MUTATION_RSP
(tracking)-|
+ * |<--Write Response-------| | |
+ * | (when CL satisfied) | | |
+ * | | |--Mark witnessed---->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ * <li>Coordinator selects a counter leader replica (prefers local DC)</li>
+ * <li>Coordinator sends CounterMutation WITHOUT an ID to the leader</li>
+ * <li>Leader assigns a MutationId using {@link
MutationTrackingService#nextMutationId}</li>
+ * <li>Leader applies counter mutation locally, which converts
CounterMutation to a regular Mutation</li>
+ * <li>Leader forwards the resulting Mutation (NOT CounterMutation) to other
replicas</li>
+ * <li>All replicas respond to coordinator for consistency level (using
CoordinatorAckInfo)</li>
+ * <li>Other replicas also respond to leader for mutation
tracking/witnessing</li>
+ * <li>Coordinator waits for CL responses before responding to client</li>
+ * </ul>
+ *
+ * @see TrackedWriteRequest for the flow when coordinator IS a replica
+ * @see MutationTrackingService for mutation ID assignment and witnessing
*/
public class ForwardedWrite
{
@@ -173,11 +262,8 @@ public class ForwardedWrite
{
if (remoteDCReplicas == null)
remoteDCReplicas = new HashMap<>();
-
- List<Replica> messages = remoteDCReplicas.get(dc);
- if (messages == null)
- messages = remoteDCReplicas.computeIfAbsent(dc, ignore
-> new ArrayList<>(3)); // most DCs will have <= 3 replicas
- messages.add(replica);
+ remoteDCReplicas.computeIfAbsent(dc, ignore -> new
ArrayList<>(3)) // most DCs will have <= 3 replicas
+ .add(replica);
}
}
@@ -241,6 +327,127 @@ public class ForwardedWrite
return handler;
}
+ /**
+ * Forward a tracked counter mutation to a replica leader for processing.
+ * The leader will apply the counter mutation, assign a mutation ID, and
replicate to other replicas.
+ */
+ public static AbstractWriteResponseHandler<Object>
forwardCounterMutation(CounterMutation counterMutation,
+
ReplicaPlan.ForWrite plan,
+
AbstractReplicationStrategy strategy,
+
Dispatcher.RequestTime requestTime)
+ {
+ Preconditions.checkArgument(counterMutation.id().isNone(),
"CounterMutation should not have an ID when forwarding");
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
+
+ // Find the leader replica - prefer local DC replicas for counters
+ Replica leader;
+ try
+ {
+ leader = ReplicaPlans.findCounterLeaderReplica(cm,
counterMutation.getKeyspaceName(),
+
counterMutation.key(),
+ localDataCenter,
+
counterMutation.consistency());
+ }
+ catch (Exception e)
+ {
+ logger.error("Failed to find counter leader replica for tracked
write", e);
+ throw e;
+ }
+
+ Preconditions.checkState(!leader.isSelf(), "Leader should not be self
when forwarding counter mutation");
+ logger.trace("Forwarding tracked counter mutation to leader replica
{}", leader);
+
+ // Create response handler for all replicas
+ AbstractWriteResponseHandler<Object> handler =
strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null,
requestTime);
+
+ // Add callbacks for all live replicas to respond directly to
coordinator
+ Message<CounterMutation> forwardMessage =
Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation,
requestTime);
+ for (Replica replica : plan.contacts())
+ {
+ if (plan.isAlive(replica))
+ {
+ logger.trace("Adding forwarding callback for tracked counter
response from {} id {}", replica, forwardMessage.id());
+
MessagingService.instance().callbacks.addWithExpiration(handler,
forwardMessage, replica);
+ }
+ else
+ {
+ handler.expired();
+ }
+ }
+
+ // Send the counter mutation to the leader
+ MessagingService.instance().send(forwardMessage, leader.endpoint());
+
+ return handler;
+ }
+
+ /**
+ * Forward a mutation to a replica leader for processing.
+ * Dispatches to the appropriate method based on mutation type.
+ *
+ * @param mutation the mutation to forward (can be Mutation or
CounterMutation)
+ * @param plan the replica plan
+ * @param strategy the replication strategy
+ * @param requestTime the request time
+ * @return the write response handler
+ */
+ public static AbstractWriteResponseHandler<Object> forward(IMutation
mutation,
+
ReplicaPlan.ForWrite plan,
+
AbstractReplicationStrategy strategy,
+
Dispatcher.RequestTime requestTime)
+ {
+ if (mutation instanceof CounterMutation)
+ return forwardCounterMutation((CounterMutation) mutation, plan,
strategy, requestTime);
+ else
+ return forwardMutation((Mutation) mutation, plan, strategy,
requestTime);
+ }
+
+ /**
+ * Apply a forwarded tracked counter mutation on the leader replica.
+ * Called by CounterMutationVerbHandler when receiving a forwarded counter
write.
+ * <p>
+ * This method:
+ * 1. Creates CoordinatorAckInfo from the incoming message
+ * 2. Creates a LeaderCallback to track responses from replicas
+ * 3. Applies counter mutation locally with generated mutation ID
+ * 4. Forwards result (Mutation not CounterMutation) to other replicas
with CoordinatorAckInfo and LeaderCallback
+ * 5. Sends leader's response back to coordinator
+ *
+ * @param counterMutation the counter mutation to apply
+ * @param message the original message (contains coordinator address and
message ID)
+ */
+ public static void applyForwardedCounterMutation(CounterMutation
counterMutation, Message<CounterMutation> message)
+ {
+ CoordinatorAckInfo coordinatorAckInfo =
CoordinatorAckInfo.toCoordinator(message.from(), message.id());
+
+ String keyspaceName = counterMutation.getKeyspaceName();
+ Token token = counterMutation.key().getToken();
+ Keyspace ks = Keyspace.open(keyspaceName);
+ ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks,
counterMutation.consistency(), token, ReplicaPlans.writeAll);
+
+ MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+
+ logger.trace("Forwarded counter mutation {}: applying locally with ID
and forwarding to other replicas", id);
+
+ // Create LeaderCallback to track when replicas respond, allowing the
leader
+ // to mark the mutation ID as witnessed on each replica proactively
+ LeaderCallback leaderCallback = new LeaderCallback(id,
coordinatorAckInfo);
+
+ // Apply counter mutation with ID to get result
+ Mutation result = counterMutation.applyCounterMutation(id);
+
+ // Apply locally using the leader callback
+ TrackedWriteRequest.applyMutationLocally(result, leaderCallback);
+
+ // Send result to other replicas with CoordinatorAckInfo and
LeaderCallback
+ // Replicas will respond to both the leader (for witnessing) and the
coordinator (for CL)
+ TrackedWriteRequest.sendToReplicas(result, plan, leaderCallback,
coordinatorAckInfo);
+
+ logger.trace("Tracked counter mutation {} processed, local application
and replication initiated", id);
+ }
+
public static final IVersionedSerializer<MutationRequest> serializer = new
IVersionedSerializer<>()
{
@Override
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index b45eae9f74..e47b8766e6 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.concurrent.DebuggableTask;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
@@ -64,8 +65,93 @@ import org.apache.cassandra.utils.MonotonicClock;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
+import static org.apache.cassandra.net.Verb.COUNTER_MUTATION_REQ;
import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+/**
+ * Handles tracked writes where the coordinator IS a replica for the write.
+ *
+ * <p>When the coordinator is a replica, it acts as the leader and directly
assigns the MutationId,
+ * applies the mutation locally, and forwards to other replicas. This avoids
the extra network hop
+ * compared to {@link ForwardedWrite}.
+ *
+ * <h2>Request/Response Flow</h2>
+ *
+ * <h3>Regular Writes (Mutation)</h3>
+ *
+ * <pre>
+ * Client Coordinator/Leader Other Replicas
+ * | | |
+ * |---Write Request----->| |
+ * | | |
+ * | |--Assign MutationId |
+ * | | |
+ * | |--Apply locally---------->|
+ * | | (with ID) |
+ * | | |
+ * | |--MUTATION_REQ----------->|
+ * | | (Mutation w/ ID) |
+ * | | |
+ * | | |--Apply locally
+ * | | |
+ * | |<------MUTATION_RSP-------|
+ * | | |
+ * |<--Write Response-----| |
+ * | (when CL satisfied) | |
+ * | |--Mark witnessed--------->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ * <li>Coordinator is a replica, so it acts as the leader</li>
+ * <li>Coordinator assigns a MutationId using {@link
MutationTrackingService#nextMutationId}</li>
+ * <li>Coordinator applies mutation locally before sending to other
replicas</li>
+ * <li>Coordinator sends mutation WITH ID to other replicas</li>
+ * <li>Replicas respond directly to coordinator for consistency level</li>
+ * <li>Coordinator tracks witnessing for all replicas including itself</li>
+ * <li>Client response sent when CL is satisfied</li>
+ * </ul>
+ *
+ * <h3>Counter Writes (CounterMutation)</h3>
+ *
+ * <pre>
+ * Client Coordinator/Leader Other Replicas
+ * | | |
+ * |---Counter Write----->| |
+ * | | |
+ * | |--Assign MutationId |
+ * | | |
+ * | |--Apply counter---------->|
+ * | | mutation (converts |
+ * | | CounterMutation to |
+ * | | Mutation w/ ID) |
+ * | | |
+ * | |--MUTATION_REQ----------->|
+ * | | (Mutation result) |
+ * | | |
+ * | | |--Apply locally
+ * | | |
+ * | |<------MUTATION_RSP-------|
+ * | | |
+ * |<--Write Response-----| |
+ * | (when CL satisfied) | |
+ * | |--Mark witnessed--------->|
+ * </pre>
+ *
+ * <p><b>Key Points:</b>
+ * <ul>
+ * <li>Coordinator is a replica, so it acts as the leader</li>
+ * <li>Coordinator assigns a MutationId using {@link
MutationTrackingService#nextMutationId}</li>
+ * <li>Coordinator applies counter mutation locally, which converts
CounterMutation to regular Mutation</li>
+ * <li>Coordinator sends the resulting Mutation (NOT CounterMutation) to
other replicas</li>
+ * <li>Replicas respond directly to coordinator for consistency level</li>
+ * <li>Coordinator tracks witnessing for all replicas including itself</li>
+ * <li>Client response sent when CL is satisfied</li>
+ * </ul>
+ *
+ * @see ForwardedWrite for the flow when coordinator is NOT a replica
+ * @see MutationTrackingService for mutation ID assignment and witnessing
+ */
public class TrackedWriteRequest
{
private static final Logger logger =
LoggerFactory.getLogger(TrackedWriteRequest.class);
@@ -78,7 +164,7 @@ public class TrackedWriteRequest
* @param requestTime object holding times when request got enqueued and
started execution
*/
public static AbstractWriteResponseHandler<?> perform(
- Mutation mutation, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
+ IMutation mutation, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
Tracing.trace("Determining replicas for mutation");
@@ -92,33 +178,61 @@ public class TrackedWriteRequest
if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) == null)
{
- if (logger.isTraceEnabled())
- logger.trace("Remote tracked request {} {}", mutation, plan);
+ logger.trace("Remote tracked request {} {}", mutation, plan);
writeMetrics.remoteRequests.mark();
- return ForwardedWrite.forwardMutation(mutation, plan, rs,
requestTime);
+ return ForwardedWrite.forward(mutation, plan, rs, requestTime);
}
- if (logger.isTraceEnabled())
- logger.trace("Local tracked request {} {}", mutation, plan);
+ logger.trace("Local tracked request {} {}", mutation, plan);
writeMetrics.localRequests.mark();
+
MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);
if (logger.isTraceEnabled())
+ {
logger.trace("Write replication plan for mutation {}: live={},
pending={}, all={}",
id, plan.live(), plan.pending(), plan.contacts());
+ }
- TrackedWriteResponseHandler handler =
- TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan,
null, WriteType.SIMPLE, null, requestTime), id);
- applyLocallyAndSendToReplicas(mutation, plan, handler);
+ final TrackedWriteResponseHandler handler;
+ if (mutation instanceof CounterMutation)
+ {
+ handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null,
WriteType.COUNTER, null, requestTime), id);
+ applyCounterMutationLocally((CounterMutation) mutation, plan,
handler);
+ }
+ else
+ {
+ handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null,
WriteType.SIMPLE, null, requestTime), id);
+ applyLocallyAndSendToReplicas((Mutation) mutation, plan, handler);
+ }
return handler;
}
public static void applyLocallyAndSendToReplicas(Mutation mutation,
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler)
{
- String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
+ applyMutationLocally(mutation, handler);
+ sendToReplicas(mutation, plan, handler, null);
+ }
+
+ /**
+ * Sends a mutation to all replicas.
+ * Handles grouping replicas by DC, sending messages, and tracking remote
replicas.
+ *
+ * @param mutation the mutation with assigned ID to send to replicas
+ * @param plan the replica plan
+ * @param handler the response handler (can be TrackedWriteResponseHandler
or LeaderCallback)
+ * @param coordinatorAckInfo optional coordinator info for forwarded
writes (null for local coordinator)
+ */
+ public static void sendToReplicas(Mutation mutation,
+ ReplicaPlan.ForWrite plan,
+ RequestCallback<NoPayload> handler,
+ ForwardedWrite.CoordinatorAckInfo
coordinatorAckInfo)
+ {
+ Preconditions.checkArgument(handler instanceof
TrackedWriteResponseHandler || handler instanceof ForwardedWrite.LeaderCallback,
+ "Handler must be
TrackedWriteResponseHandler or LeaderCallback");
- boolean applyLocally = false;
+ String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
// this DC replicas
List<Replica> localDCReplicas = null;
@@ -126,7 +240,7 @@ public class TrackedWriteRequest
// extra-DC, grouped by DC
Map<String, List<Replica>> remoteDCReplicas = null;
- // only need to create a Message for non-local writes
+ // create a Message for non-local writes
Message<Mutation> message = null;
// For performance, Mutation caches serialized buffers that are
computed lazily in serializedBuffer(). That
@@ -138,28 +252,39 @@ public class TrackedWriteRequest
// the current version, but it's just an optimization, and we're ok
not optimizing for mixed-version clusters.
Mutation.serializer.prepareSerializedBuffer(mutation,
MessagingService.current_version);
+ // Extract request time from handler
+ Dispatcher.RequestTime requestTime = getRequestTime(handler);
+
+ boolean foundSelf = false;
for (Replica destination : plan.contacts())
{
if (!plan.isAlive(destination))
{
- if (logger.isTraceEnabled())
- logger.trace("Skipping dead replica {} for mutation {}",
destination, mutation.id());
- handler.expired(); // immediately mark the response as expired
since the request will not be sent
+ logger.trace("Skipping dead replica {} for mutation {}",
destination, mutation.id());
+ // Only call expired() for AbstractWriteResponseHandler (not
for LeaderCallback)
+ if (handler instanceof AbstractWriteResponseHandler)
+ ((AbstractWriteResponseHandler<?>) handler).expired(); //
immediately mark the response as expired since the request will not be sent
continue;
}
if (destination.isSelf())
{
- applyLocally = true;
+ foundSelf = true; // Mutation was already applied locally
continue;
}
if (message == null)
{
- message = Message.builder(MUTATION_REQ, mutation)
- .withRequestTime(handler.getRequestTime())
- .withFlag(MessageFlag.CALL_BACK_ON_FAILURE)
- .build();
+ Message.Builder<Mutation> builder =
Message.builder(MUTATION_REQ, mutation)
+
.withRequestTime(requestTime)
+
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE);
+
+ // If this is a forwarded write, include coordinator ack info
so replicas
+ // know to respond to the original coordinator, not this leader
+ if (coordinatorAckInfo != null)
+ builder.withParam(ParamType.COORDINATOR_ACK_INFO,
coordinatorAckInfo);
+
+ message = builder.build();
}
String dc =
DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter;
@@ -174,16 +299,12 @@ public class TrackedWriteRequest
{
if (remoteDCReplicas == null)
remoteDCReplicas = new HashMap<>();
-
- List<Replica> replicas = remoteDCReplicas.get(dc);
- if (replicas == null)
- replicas = remoteDCReplicas.computeIfAbsent(dc, ignore ->
new ArrayList<>(3)); // most DCs will have <= 3 replicas
- replicas.add(destination);
+ remoteDCReplicas.computeIfAbsent(dc, ignore -> new
ArrayList<>(3)) // most DCs will have <= 3 replicas
+ .add(destination);
}
}
- Preconditions.checkState(applyLocally); // the coordinator is always a
replica
- applyMutationLocally(mutation, handler);
+ Preconditions.checkState(foundSelf, "Coordinator must be a replica");
IntHashSet remoteReplicas = null;
if (localDCReplicas != null || remoteDCReplicas != null)
@@ -193,9 +314,13 @@ public class TrackedWriteRequest
{
for (Replica replica : localDCReplicas)
{
- if (logger.isTraceEnabled())
- logger.trace("Sending mutation {} to local replica {}",
mutation.id(), replica);
- MessagingService.instance().sendWriteWithCallback(message,
replica, handler);
+ logger.trace("Sending mutation {} to local replica {}",
mutation.id(), replica);
+ // Use appropriate send method based on handler type
+ if (handler instanceof AbstractWriteResponseHandler)
+ MessagingService.instance().sendWriteWithCallback(message,
replica, (AbstractWriteResponseHandler<?>) handler);
+ else
+ MessagingService.instance().sendWithCallback(message,
replica.endpoint(), handler);
+
remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id());
}
}
@@ -205,20 +330,69 @@ public class TrackedWriteRequest
// for each datacenter, send the message to one node to relay the
write to other replicas
for (List<Replica> dcReplicas : remoteDCReplicas.values())
{
- if (logger.isTraceEnabled())
- logger.trace("Sending mutation {} to remote dc replicas
{}", mutation.id(), dcReplicas);
- sendMessagesToRemoteDC(message,
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, null);
+ logger.trace("Sending mutation {} to remote dc replicas {}",
mutation.id(), dcReplicas);
+ sendMessagesToRemoteDC(message,
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler,
coordinatorAckInfo);
for (Replica replica : dcReplicas)
remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id());
}
}
if (remoteReplicas != null)
- {
- if (logger.isTraceEnabled())
- logger.trace("Sending mutation {} to remote replicas {}",
mutation.id(), remoteReplicas);
MutationTrackingService.instance.sentWriteRequest(mutation,
remoteReplicas);
+ }
+
+ /*
+ * Send the message to the first replica of targets, and have it forward
the message to others in its DC
+ */
+ static void sendMessagesToRemoteDC(Message<? extends IMutation> message,
+ EndpointsForToken targets,
+ RequestCallback<NoPayload> handler,
+ ForwardedWrite.CoordinatorAckInfo ackTo)
+ {
+ final Replica target;
+
+ if (targets.size() > 1)
+ {
+ target = pickReplica(targets);
+ EndpointsForToken forwardToReplicas = targets.filter(r -> r !=
target, targets.size());
+
+ for (Replica replica : forwardToReplicas)
+ {
+ if (handler instanceof TrackedWriteResponseHandler)
+
MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler)
handler, message, replica);
+ else if (handler instanceof ForwardedWrite.LeaderCallback)
+
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica.endpoint());
+ else
+ throw new IllegalStateException();
+ logger.trace("Adding FWD message to {}@{}", message.id(),
replica);
+ }
+
+ // starting with 4.0, use the same message id for all replicas
+ long[] messageIds = new long[forwardToReplicas.size()];
+ Arrays.fill(messageIds, message.id());
+
+ message = message.withForwardTo(new
ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
+ }
+ else
+ {
+ target = targets.get(0);
}
+ if (ackTo != null)
+ message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo);
+
+ Tracing.trace("Sending mutation to remote replica {}", target);
+ if (handler instanceof ForwardedWrite.LeaderCallback)
+
MessagingService.instance().sendForwardedWriteWithCallback(message, target,
(ForwardedWrite.LeaderCallback) handler);
+ else
+ MessagingService.instance().sendWriteWithCallback(message, target,
(AbstractWriteResponseHandler<?>) handler);
+ logger.trace("Sending message to {}@{}", message.id(), target);
+ }
+
+ private static Replica pickReplica(EndpointsForToken targets)
+ {
+ EndpointsForToken healthy = targets.filter(r ->
DynamicEndpointSnitch.getSeverity(r.endpoint()) == 0);
+ EndpointsForToken select = healthy.isEmpty() ? targets : healthy;
+ return select.get(ThreadLocalRandom.current().nextInt(0,
select.size()));
}
static void applyMutationLocally(Mutation mutation,
RequestCallback<NoPayload> handler)
@@ -227,6 +401,13 @@ public class TrackedWriteRequest
Stage.MUTATION.maybeExecuteImmediately(new
LocalMutationRunnable(mutation, handler));
}
+ static void applyCounterMutationLocally(CounterMutation counterMutation,
+ ReplicaPlan.ForWrite plan,
+ TrackedWriteResponseHandler
handler)
+ {
+ Stage.COUNTER_MUTATION.maybeExecuteImmediately(new
LocalCounterMutationRunnable(counterMutation, plan, handler));
+ }
+
private static class LocalMutationRunnable implements
DebuggableTask.RunnableDebuggableTask
{
private final Mutation mutation;
@@ -239,20 +420,11 @@ public class TrackedWriteRequest
this.handler = handler;
}
- private Dispatcher.RequestTime getRequestTime()
- {
- if (handler instanceof TrackedWriteResponseHandler)
- return ((TrackedWriteResponseHandler)
handler).getRequestTime();
- if (handler instanceof ForwardedWrite.LeaderCallback)
- return ((ForwardedWrite.LeaderCallback)
handler).getRequestTime();
- throw new IllegalStateException();
- }
-
@Override
public final void run()
{
long now = MonotonicClock.Global.approxTime.now();
- long deadline =
getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
+ long deadline =
getRequestTime(handler).computeDeadline(MUTATION_REQ.expiresAfterNanos());
if (now > deadline)
{
@@ -277,13 +449,13 @@ public class TrackedWriteRequest
@Override
public long creationTimeNanos()
{
- return getRequestTime().enqueuedAtNanos();
+ return getRequestTime(handler).enqueuedAtNanos();
}
@Override
public long startTimeNanos()
{
- return getRequestTime().startedAtNanos();
+ return getRequestTime(handler).startedAtNanos();
}
@Override
@@ -295,57 +467,76 @@ public class TrackedWriteRequest
}
}
- /*
- * Send the message to the first replica of targets, and have it forward
the message to others in its DC
- */
- static void sendMessagesToRemoteDC(Message<? extends IMutation> message,
- EndpointsForToken targets,
- RequestCallback<NoPayload> handler,
- ForwardedWrite.CoordinatorAckInfo ackTo)
+ private static class LocalCounterMutationRunnable implements
DebuggableTask.RunnableDebuggableTask
{
- final Replica target;
+ private final CounterMutation counterMutation;
+ private final ReplicaPlan.ForWrite plan;
+ private final TrackedWriteResponseHandler handler;
- if (targets.size() > 1)
+ LocalCounterMutationRunnable(CounterMutation counterMutation,
ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler)
{
- target = pickReplica(targets);
- EndpointsForToken forwardToReplicas = targets.filter(r -> r !=
target, targets.size());
+ this.counterMutation = counterMutation;
+ this.plan = plan;
+ this.handler = handler;
+ }
- for (Replica replica : forwardToReplicas)
+ private Dispatcher.RequestTime getReqestTime()
+ {
+ return handler.getRequestTime();
+ }
+
+ @Override
+ public void run()
+ {
+ long now = MonotonicClock.Global.approxTime.now();
+ long deadline =
getReqestTime().computeDeadline(COUNTER_MUTATION_REQ.expiresAfterNanos());
+
+ if (now > deadline)
{
- if (handler instanceof TrackedWriteResponseHandler)
-
MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler)
handler, message, replica);
- else if (handler instanceof ForwardedWrite.LeaderCallback)
-
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica.endpoint());
- else
- throw new IllegalStateException();
- logger.trace("Adding FWD message to {}@{}", message.id(),
replica);
+ long timeTakenNanos = now - startTimeNanos();
+
MessagingService.instance().metrics.recordSelfDroppedMessage(COUNTER_MUTATION_REQ,
timeTakenNanos, NANOSECONDS);
+ return;
}
- // starting with 4.0, use the same message id for all replicas
- long[] messageIds = new long[forwardToReplicas.size()];
- Arrays.fill(messageIds, message.id());
+ try
+ {
+ Mutation result =
counterMutation.applyCounterMutation(counterMutation.id());
+ handler.onResponse(null);
+ sendToReplicas(result, plan, handler, null);
+ }
+ catch (Exception ex)
+ {
+ if(!(ex instanceof WriteTimeoutException))
+ logger.error("Failed to apply counter mutation locally:
", ex);
+ handler.onFailure(FBUtilities.getBroadcastAddressAndPort(),
RequestFailure.forException(ex));
+ }
+ }
- message = message.withForwardTo(new
ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
+ @Override
+ public long creationTimeNanos()
+ {
+ return getReqestTime().enqueuedAtNanos();
}
- else
+
+ @Override
+ public long startTimeNanos()
{
- target = targets.get(0);
+ return getReqestTime().startedAtNanos();
}
- if (ackTo != null)
- message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo);
- Tracing.trace("Sending mutation to remote replica {}", target);
- if (handler instanceof ForwardedWrite.LeaderCallback)
-
MessagingService.instance().sendForwardedWriteWithCallback(message, target,
(ForwardedWrite.LeaderCallback) handler);
- else
- MessagingService.instance().sendWriteWithCallback(message, target,
(AbstractWriteResponseHandler<?>) handler);
- logger.trace("Sending message to {}@{}", message.id(), target);
+ @Override
+ public String description()
+ {
+ return counterMutation.toString();
+ }
}
- private static Replica pickReplica(EndpointsForToken targets)
+ private static Dispatcher.RequestTime getRequestTime(RequestCallback<?>
callback)
{
- EndpointsForToken healthy = targets.filter(r ->
DynamicEndpointSnitch.getSeverity(r.endpoint()) == 0);
- EndpointsForToken select = healthy.isEmpty() ? targets : healthy;
- return select.get(ThreadLocalRandom.current().nextInt(0,
select.size()));
+ if (callback instanceof TrackedWriteResponseHandler)
+ return ((TrackedWriteResponseHandler) callback).getRequestTime();
+ if (callback instanceof ForwardedWrite.LeaderCallback)
+ return ((ForwardedWrite.LeaderCallback) callback).getRequestTime();
+ throw new IllegalStateException();
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index cde6ad1d58..2324edce54 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -190,7 +190,6 @@ import
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static accord.primitives.Txn.Kind.Read;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.concat;
-import static java.util.Collections.singleton;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
@@ -958,71 +957,6 @@ public class StorageProxy implements StorageProxyMBean
});
}
- /**
- * Performs a coordinated write with mutation tracking.
- * Assumes that local coordinator is a replica (forwarding implementation
pending).
- *
- * @param mutation
- * @param consistencyLevel
- * @param requestTime
- */
- public static void mutateWithTracking(Mutation mutation, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
- {
- try
- {
- TrackedWriteRequest.perform(mutation, consistencyLevel,
requestTime).get();
- }
- catch (WriteTimeoutException|WriteFailureException ex)
- {
- if (consistencyLevel == ConsistencyLevel.ANY)
- {
- // TODO (expected): what exactly?
- }
- else
- {
- if (ex instanceof WriteFailureException)
- {
- writeMetrics.failures.mark();
- writeMetricsForLevel(consistencyLevel).failures.mark();
- WriteFailureException fe = (WriteFailureException)ex;
- Tracing.trace("Write failure; received {} of {} required
replies, failed {} requests",
- fe.received, fe.blockFor,
fe.failureReasonByEndpoint.size());
- }
- else
- {
- writeMetrics.timeouts.mark();
- writeMetricsForLevel(consistencyLevel).timeouts.mark();
- WriteTimeoutException te = (WriteTimeoutException)ex;
- Tracing.trace("Write timeout; received {} of {} required
replies", te.received, te.blockFor);
- }
- throw ex;
- }
- }
- catch (UnavailableException e)
- {
- writeMetrics.unavailables.mark();
- writeMetricsForLevel(consistencyLevel).unavailables.mark();
- Tracing.trace("Unavailable");
- throw e;
- }
- catch (OverloadedException e)
- {
- writeMetrics.unavailables.mark();
- writeMetricsForLevel(consistencyLevel).unavailables.mark();
- Tracing.trace("Overloaded");
- throw e;
- }
- finally
- {
- // We track latency based on request processing time, since the
amount of time that request spends in the queue
- // is not a representative metric of replica performance.
- long latency = nanoTime() - requestTime.startedAtNanos();
- writeMetrics.addNano(latency);
- writeMetricsForLevel(consistencyLevel).addNano(latency);
- updateCoordinatorWriteLatencyTableMetric(singleton(mutation),
latency);
- }
- }
-
/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
@@ -1322,8 +1256,6 @@ public class StorageProxy implements StorageProxyMBean
boolean isTracked =
Schema.instance.getKeyspaceMetadata(mutations.get(0).getKeyspaceName()).params.replicationType.isTracked();
if (isTracked)
{
- if (mutations.stream().anyMatch(m -> m instanceof CounterMutation))
- throw new InvalidRequestException("Mutation tracking is
currently unsupported with counters");
if (augmented != null)
throw new InvalidRequestException("Mutation tracking is
currently unsupported with triggers");
if (mutateAtomically)
@@ -1366,7 +1298,7 @@ public class StorageProxy implements StorageProxyMBean
{
for (IMutation trackedMutation : trackedMutations)
{
-
trackedHandlers.add(TrackedWriteRequest.perform((Mutation) trackedMutation,
consistencyLevel, requestTime));
+
trackedHandlers.add(TrackedWriteRequest.perform(trackedMutation,
consistencyLevel, requestTime));
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TrackedCounterForwardingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/TrackedCounterForwardingTest.java
new file mode 100644
index 0000000000..656b3be4f5
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/TrackedCounterForwardingTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * Tests the case where a coordinator that is NOT a replica MUST forward
+ * the counter write to a replica leader.
+ *
+ * With 6 nodes and RF=3, we can pick a coordinator that is definitely
+ * not a replica for a specific partition.
+ */
+public class TrackedCounterForwardingTest extends TestBaseImpl
+{
+ @Test
+ public void testForwardedTrackedCounterWrites() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(6)
+ .withConfig(c -> c.with(GOSSIP,
NATIVE_PROTOCOL))
+ .start())
+ {
+ cluster.schemaChange("CREATE KEYSPACE k WITH replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3} AND replication_type='tracked'");
+
+ cluster.schemaChange("CREATE TABLE k.counters (pk int PRIMARY KEY,
count counter)");
+
+ ConsistencyLevel cl = ConsistencyLevel.QUORUM;
+
+ // Test counter writes from all nodes
+ // Some will be local (coordinator is replica), some will be
forwarded (coordinator not replica)
+ for (int coordinatorNode = 1; coordinatorNode <= 6;
coordinatorNode++)
+ {
+ ICoordinator coordinator =
cluster.coordinator(coordinatorNode);
+ int pk = coordinatorNode * 100;
+
+ // Increment
+ coordinator.execute("UPDATE k.counters SET count = count + 5
WHERE pk = ?", cl, pk);
+ assertRows(coordinator.execute("SELECT count FROM k.counters
WHERE pk = ?", cl, pk), row(5L));
+
+ // Increment again
+ coordinator.execute("UPDATE k.counters SET count = count + 3
WHERE pk = ?", cl, pk);
+ assertRows(coordinator.execute("SELECT count FROM k.counters
WHERE pk = ?", cl, pk), row(8L));
+ }
+
+ // Verify all nodes can read all counters
+ for (int node = 1; node <= 6; node++)
+ {
+ ICoordinator coordinator = cluster.coordinator(node);
+ for (int pk = 100; pk <= 600; pk += 100)
+ {
+ assertRows(coordinator.execute("SELECT count FROM
k.counters WHERE pk = ?", ConsistencyLevel.ONE, pk),
+ row(8L));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java
b/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java
new file mode 100644
index 0000000000..4b49d52fc1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Unit tests for Token-Aware (Normal Path) Counter Mutations for Mutation
Tracking
+ * In these tests, the coordinator IS a replica.
+ */
+public class TrackedCounterWriteTest
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TrackedCounterWriteTest.class);
+
+ private static final String KEYSPACE_TRACKED = "TrackedCounterTest";
+ private static final String CF_COUNTER = "Counter1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+
+ SchemaLoader.createKeyspace(KEYSPACE_TRACKED,
+ KeyspaceParams.simple(3,
ReplicationType.tracked),
+ SchemaLoader.counterCFMD(KEYSPACE_TRACKED,
CF_COUNTER));
+ }
+
+ /**
+ * Tests that counter writes are properly tracked when the coordinator is
a replica.
+ * Verifies that:
+ * 1. CounterMutation initially has no ID before perform()
+ * 2. TrackedWriteRequest.perform() successfully processes the mutation
+ * 3. The counter value is correctly incremented
+ */
+ @Test
+ public void testTrackedCounterWrite_CoordinatorIsReplica() throws Exception
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE_TRACKED).getColumnFamilyStore(CF_COUNTER);
+ cfs.truncateBlocking();
+
+ ColumnMetadata cDef =
cfs.metadata().getColumn(ByteBufferUtil.bytes("val"));
+
+ Mutation m = new RowUpdateBuilder(cfs.metadata(),
System.currentTimeMillis(), "testkey1")
+ .clustering("cc")
+ .add("val", 5L)
+ .build();
+
+ CounterMutation counterMutation = new CounterMutation(m,
ConsistencyLevel.ONE);
+
+ assertTrue("CounterMutation should not have an ID before perform()",
+ counterMutation.id().isNone());
+
+ AbstractWriteResponseHandler<?> handler = TrackedWriteRequest.perform(
+ counterMutation,
+ ConsistencyLevel.ONE,
+ Dispatcher.RequestTime.forImmediateExecution()
+ );
+ assertNotNull("Handler should not be null", handler);
+
+ Thread.sleep(100); // Waiting for async operations
+
+ Row row =
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+ long counterValue = CounterContext.instance().total(row.getCell(cDef));
+ assertEquals("Counter should be incremented to 5", 5L, counterValue);
+ }
+
+ /**
+ * Tests that multiple counter updates on the same key accumulate
correctly with tracking.
+ * Verifies that:
+ * 1. Multiple sequential increments/decrements are properly tracked
+ * 2. Counter values accumulate correctly across updates
+ */
+ @Test
+ public void testTrackedCounterWrite_MultipleIncrements() throws Exception
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE_TRACKED).getColumnFamilyStore(CF_COUNTER);
+ cfs.truncateBlocking();
+
+ ColumnMetadata cDef =
cfs.metadata().getColumn(ByteBufferUtil.bytes("val"));
+
+ performCounterUpdate(cfs, "testkey2", 3L); // First increment (+3)
+
+ Row row =
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+ long value = CounterContext.instance().total(row.getCell(cDef));
+ assertEquals("Counter should be 3", 3L, value);
+
+ performCounterUpdate(cfs, "testkey2", 7L); // Second increment (+7)
+
+ row =
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+ value = CounterContext.instance().total(row.getCell(cDef));
+ assertEquals("Counter should be 10", 10L, value);
+
+ performCounterUpdate(cfs, "testkey2", -4L); // Decrement (-4)
+
+ row =
Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
+ value = CounterContext.instance().total(row.getCell(cDef));
+ assertEquals("Counter should be 6", 6L, value);
+ }
+
+ /**
+ * Performs a counter update and waits for async replication to complete
+ */
+ private void performCounterUpdate(ColumnFamilyStore cfs, String key, long
delta) throws Exception
+ {
+ Mutation m = new RowUpdateBuilder(cfs.metadata(),
System.currentTimeMillis(), key)
+ .clustering("cc")
+ .add("val", delta)
+ .build();
+
+ CounterMutation counterMutation = new CounterMutation(m,
ConsistencyLevel.ONE);
+
+ TrackedWriteRequest.perform(
+ counterMutation,
+ ConsistencyLevel.ONE,
+ Dispatcher.RequestTime.forImmediateExecution()
+ );
+
+ Thread.sleep(50);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]