This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new e0d2daa86c Implement coordinator log offset broadcasting
e0d2daa86c is described below
commit e0d2daa86ce34d66dd1444adb37a56617c0c6703
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Fri Apr 4 12:38:28 2025 +0100
Implement coordinator log offset broadcasting
patch by Aleksey Yeschenko; reviewed by Blake Eggleston for CASSANDRA-20576
---
src/java/org/apache/cassandra/net/Verb.java | 12 +-
.../cassandra/replication/CoordinatorLog.java | 159 ++++++++++++++-------
.../cassandra/replication/CoordinatorLogId.java | 17 ++-
.../cassandra/replication/ForwardedWrite.java | 2 +-
.../cassandra/replication/MutationSummary.java | 8 ++
.../replication/MutationTrackingService.java | 129 +++++++++++++++--
.../org/apache/cassandra/replication/Offsets.java | 21 +--
.../apache/cassandra/replication/Participants.java | 5 +
.../org/apache/cassandra/replication/Shard.java | 59 +++++++-
.../replication/ShardReplicatedOffsets.java | 108 ++++++++++++++
.../cassandra/replication/TrackedWriteRequest.java | 12 +-
...articipants.java => UnreconciledMutations.java} | 44 ++----
...ates.java => UnreconciledMutationsReplica.java} | 17 ++-
.../service/TrackedWriteResponseHandler.java | 2 +-
.../service/reads/tracked/ReadReconcileNotify.java | 2 +-
.../service/reads/tracked/ReadReconciliations.java | 4 +-
.../test/tracking/OffsetBroadcastTest.java | 79 ++++++++++
.../cassandra/replication/CoordinatorLogTest.java | 6 +-
.../apache/cassandra/replication/OffsetsTest.java | 10 +-
19 files changed, 549 insertions(+), 147 deletions(-)
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 100aca623d..c6b44ce66b 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -72,6 +72,7 @@ import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.replication.ForwardedWrite;
+import org.apache.cassandra.replication.ShardReplicatedOffsets;
import org.apache.cassandra.schema.SchemaMutationsSerializer;
import org.apache.cassandra.schema.SchemaPullVerbHandler;
import org.apache.cassandra.schema.SchemaPushVerbHandler;
@@ -246,11 +247,12 @@ public enum Verb
TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_LOG,
MessageSerializers::logStateSerializer, () ->
ResponseVerbHandler.instance ),
TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_LOG, ()
-> FetchPeerLog.serializer, () ->
FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ),
- // logged replication
- READ_RECONCILE_SEND (901, P0, rpcTimeout, READ, ()
-> ReadReconcileSend.serializer, () ->
ReadReconcileSend.verbHandler),
- READ_RECONCILE_RCV (902, P0, rpcTimeout, MUTATION, ()
-> ReadReconcileReceive.serializer, () ->
ReadReconcileReceive.verbHandler),
- READ_RECONCILE_NOTIFY (903, P0, rpcTimeout, REQUEST_RESPONSE, ()
-> ReadReconcileNotify.serializer, () ->
ReadReconcileNotify.verbHandler),
- FORWARDING_WRITE (904, P3, writeTimeout, MUTATION, ()
-> ForwardedWrite.Request.serializer, () ->
ForwardedWrite.verbHandler),
+ // tracked replication
+ READ_RECONCILE_SEND (901, P0, rpcTimeout, READ, ()
-> ReadReconcileSend.serializer, () ->
ReadReconcileSend.verbHandler ),
+ READ_RECONCILE_RCV (902, P0, rpcTimeout, MUTATION, ()
-> ReadReconcileReceive.serializer, () ->
ReadReconcileReceive.verbHandler ),
+ READ_RECONCILE_NOTIFY (903, P0, rpcTimeout, REQUEST_RESPONSE, ()
-> ReadReconcileNotify.serializer, () ->
ReadReconcileNotify.verbHandler ),
+ FORWARDING_WRITE (904, P3, writeTimeout, MUTATION, ()
-> ForwardedWrite.Request.serializer, () ->
ForwardedWrite.verbHandler ),
+ BROADCAST_LOG_OFFSETS (905, P1, rpcTimeout, MISC, ()
-> ShardReplicatedOffsets.serializer, () ->
ShardReplicatedOffsets.verbHandler),
INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () ->
NoPayload.serializer, () -> ResponseVerbHandler.instance
),
INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () ->
DataMovement.serializer, () -> DataMovementVerbHandler.instance,
INITIATE_DATA_MOVEMENTS_RSP ),
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 0271d2eccf..7dad88cf97 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -21,15 +21,17 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.schema.TableId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@@ -41,29 +43,26 @@ public abstract class CoordinatorLog
protected final CoordinatorLogId logId;
protected final Participants participants;
- /**
- * State machines and an Id <-> token index for unreconciled mutation ids
that exist oh this host.
- */
- private final LocalMutationStates unreconciledMutations;
+ protected final Offsets[] witnessedOffsets;
+ protected final Offsets reconciledOffsets;
- protected final Offsets[] witnessedIds;
- protected final Offsets reconciledIds;
protected final ReadWriteLock lock;
+ abstract UnreconciledMutations unreconciledMutations();
+
CoordinatorLog(int localHostId, CoordinatorLogId logId, Participants
participants)
{
this.localHostId = localHostId;
this.logId = logId;
this.participants = participants;
- this.unreconciledMutations = new LocalMutationStates();
this.lock = new ReentrantReadWriteLock();
Offsets[] ids = new Offsets[participants.size()];
- for (int i = 0; i < participants.size(); i++)
+ for (int i = 0, size = participants.size(); i < size; ++i)
ids[i] = new Offsets(logId);
- witnessedIds = ids;
- reconciledIds = new Offsets(logId);
+ witnessedOffsets = ids;
+ reconciledOffsets = new Offsets(logId);
}
static CoordinatorLog create(int localHostId, CoordinatorLogId id,
Participants participants)
@@ -72,7 +71,7 @@ public abstract class CoordinatorLog
: new
CoordinatorLogReplica(localHostId, id, participants);
}
- void witnessedRemoteMutation(MutationId mutationId, int onHostId)
+ void receivedWriteResponse(MutationId mutationId, int onHostId)
{
Preconditions.checkArgument(!mutationId.isNone());
logger.trace("witnessed remote mutation {} from {}", mutationId,
onHostId);
@@ -80,26 +79,17 @@ public abstract class CoordinatorLog
try
{
if (!get(onHostId).add(mutationId.offset()))
- return; // already witnessed
+ return; // already witnessed; very uncommon but possible path
if (!getLocal().contains(mutationId.offset()))
- return; // local host hasn't witnessed -> no cleanup needed
+ return; // local host hasn't witnessed yet -> no cleanup needed
- // see if any other replicas haven't witnessed the id yet
- boolean allOtherReplicasWitnessed = true;
- for (int i = 0; i < participants.size() &&
allOtherReplicasWitnessed; i++)
- {
- int hostId = participants.get(i);
- if (hostId != onHostId && hostId != localHostId &&
!get(hostId).contains(mutationId.offset()))
- allOtherReplicasWitnessed = false;
- }
-
- if (allOtherReplicasWitnessed)
+ if (hasWrittenToRemoteReplicas(mutationId.offset()))
{
logger.trace("marking mutation {} as fully reconciled",
mutationId);
// if all replicas have now witnessed the id, remove it from
the index
- unreconciledMutations.remove(mutationId.offset());
- reconciledIds.add(mutationId.offset());
+ unreconciledMutations().remove(mutationId.offset());
+ reconciledOffsets.add(mutationId.offset());
}
}
finally
@@ -108,6 +98,46 @@ public abstract class CoordinatorLog
}
}
+ void updateReplicatedOffsets(Offsets offsets, int onHostId)
+ {
+ lock.writeLock().lock();
+ try
+ {
+ get(onHostId).addAll(offsets, (ignore, start, end) ->
+ {
+ for (int offset = start; offset <= end; ++offset)
+ {
+ // TODO (desired): skip checking the host's offsets - all
just added
+ // TODO (desired): use the fact that Offsets are ordered
to optimise this look up
+ if (hasWrittenLocally(offset) &&
hasWrittenToRemoteReplicas(offset))
+ {
+ reconciledOffsets.add(offset);
+ unreconciledMutations().remove(offset);
+ }
+ }
+ });
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ Offsets collectReplicatedOffsets()
+ {
+ lock.readLock().lock();
+ try
+ {
+ Offsets offsets =
witnessedOffsets[participants.indexOf(localHostId)];
+ return offsets.isEmpty() ? null : offsets.copy();
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
void startWriting(Mutation mutation)
{
lock.writeLock().lock();
@@ -116,7 +146,7 @@ public abstract class CoordinatorLog
if (getLocal().contains(mutation.id().offset()))
return; // already witnessed; shouldn't get to this path often
(duplicate mutation)
- unreconciledMutations.startWriting(mutation);
+ unreconciledMutations().startWriting(mutation);
}
finally
{
@@ -127,27 +157,21 @@ public abstract class CoordinatorLog
void finishWriting(Mutation mutation)
{
logger.trace("witnessed local mutation {}", mutation.id());
+
lock.writeLock().lock();
try
{
- if (!getLocal().add(mutation.id().offset()))
- throw new IllegalStateException("finishWriting() called on a
reconciled mutation");
+ int offset = mutation.id().offset();
+ if (!getLocal().add(offset))
+ throw new IllegalStateException("finishWriting() called on a
locally witnessed mutation " + mutation.id());
+
+ unreconciledMutations().finishWriting(mutation);
- // see if any other replicas haven't witnessed the id yet
- boolean allOtherReplicasWitnessed = true;
- for (int i = 0; i < participants.size() &&
allOtherReplicasWitnessed; i++)
+ if (hasWrittenToRemoteReplicas(offset))
{
- int hostId = participants.get(i);
- if (hostId != localHostId &&
!get(hostId).contains(mutation.id().offset()))
- allOtherReplicasWitnessed = false;
+ reconciledOffsets.add(offset);
+ unreconciledMutations().remove(offset);
}
-
- // if some replicas also haven't witnessed the mutation yet, we
should update local mutation state;
- // otherwise we are the last node to witness this mutation, and
can clean it up
- if (allOtherReplicasWitnessed)
- reconciledIds.add(mutation.id().offset());
- else
- unreconciledMutations.finishWriting(mutation);
}
finally
{
@@ -155,6 +179,22 @@ public abstract class CoordinatorLog
}
}
+ private boolean hasWrittenLocally(int offset)
+ {
+ return getLocal().contains(offset);
+ }
+
+ private boolean hasWrittenToRemoteReplicas(int offset)
+ {
+ for (int i = 0; i < participants.size(); ++i)
+ {
+ int hostId = participants.get(i);
+ if (hostId != localHostId && !get(hostId).contains(offset))
+ return false;
+ }
+ return true;
+ }
+
/**
* Look up unreconciled sequence ids of mutations witnessed by this host
in this coordinataor log.
* Adds the ids to the supplied collection, so it can be reused to
aggregate lookups for multiple logs.
@@ -164,8 +204,8 @@ public abstract class CoordinatorLog
lock.readLock().lock();
try
{
- reconciledInto.addAll(reconciledIds);
- return unreconciledMutations.collect(token, tableId,
includePending, unreconciledInto);
+ reconciledInto.addAll(reconciledOffsets);
+ return unreconciledMutations().collect(token, tableId,
includePending, unreconciledInto);
}
finally
{
@@ -182,8 +222,8 @@ public abstract class CoordinatorLog
lock.readLock().lock();
try
{
- reconciledInto.addAll(reconciledIds);
- return unreconciledMutations.collect(range, tableId,
includePending, unreconciledInto);
+ reconciledInto.addAll(reconciledOffsets);
+ return unreconciledMutations().collect(range, tableId,
includePending, unreconciledInto);
}
finally
{
@@ -193,21 +233,29 @@ public abstract class CoordinatorLog
protected Offsets get(int hostId)
{
- return witnessedIds[participants.indexOf(hostId)];
+ return witnessedOffsets[participants.indexOf(hostId)];
}
protected Offsets getLocal()
{
- return witnessedIds[participants.indexOf(localHostId)];
+ return witnessedOffsets[participants.indexOf(localHostId)];
}
public static class CoordinatorLogPrimary extends CoordinatorLog
{
- AtomicLong sequenceId = new AtomicLong(-1);
+ private final AtomicLong sequenceId = new AtomicLong(-1);
+ private final UnreconciledMutationsReplica unreconciledMutations;
CoordinatorLogPrimary(int localHostId, CoordinatorLogId logId,
Participants participants)
{
super(localHostId, logId, participants);
+ unreconciledMutations = new UnreconciledMutationsReplica();
+ }
+
+ @Override
+ UnreconciledMutationsReplica unreconciledMutations()
+ {
+ return unreconciledMutations;
}
MutationId nextId()
@@ -235,9 +283,18 @@ public abstract class CoordinatorLog
public static class CoordinatorLogReplica extends CoordinatorLog
{
+ private final UnreconciledMutationsReplica unreconciledMutations;
+
CoordinatorLogReplica(int localHostId, CoordinatorLogId logId,
Participants participants)
{
super(localHostId, logId, participants);
+ this.unreconciledMutations = new UnreconciledMutationsReplica();
+ }
+
+ @Override
+ UnreconciledMutationsReplica unreconciledMutations()
+ {
+ return unreconciledMutations;
}
}
}
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index 37d4f25851..f7dd55cce7 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -118,7 +118,7 @@ public class CoordinatorLogId implements Serializable
public static final Comparator<CoordinatorLogId> comparator = (l, r) ->
Long.compareUnsigned(l.asLong(), r.asLong());
- public static final IVersionedSerializer<CoordinatorLogId> serializer =
new IVersionedSerializer<>()
+ static final class Serializer implements
IVersionedSerializer<CoordinatorLogId>
{
@Override
public void serialize(CoordinatorLogId logId, DataOutputPlus out, int
version) throws IOException
@@ -127,6 +127,12 @@ public class CoordinatorLogId implements Serializable
out.writeInt(logId.hostLogId);
}
+ public void serialize(long logId, DataOutputPlus out, int version)
throws IOException
+ {
+ out.writeInt(hostId(logId));
+ out.writeInt(hostLogId(logId));
+ }
+
@Override
public CoordinatorLogId deserialize(DataInputPlus in, int version)
throws IOException
{
@@ -142,5 +148,12 @@ public class CoordinatorLogId implements Serializable
{
return TypeSizes.sizeof(logId.hostId) +
TypeSizes.sizeof(logId.hostLogId);
}
- };
+
+ public long serializedSize(long logId, int version)
+ {
+ return TypeSizes.sizeof(logId);
+ }
+ }
+
+ static final Serializer serializer = new Serializer();
}
diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
index 51a072a423..41153e307f 100644
--- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
+++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
@@ -402,7 +402,7 @@ public class ForwardedWrite
{
// Local mutations are witnessed from Keyspace.applyInternalTracked
if (msg != null)
-
MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, id,
msg.from());
+
MutationTrackingService.instance.receivedWriteResponse(keyspace, token, id,
msg.from());
// Local write needs to be ack'd to coordinator
if (msg == null && ackTo != null)
diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java
b/src/java/org/apache/cassandra/replication/MutationSummary.java
index 4f07d9ca62..072508c67f 100644
--- a/src/java/org/apache/cassandra/replication/MutationSummary.java
+++ b/src/java/org/apache/cassandra/replication/MutationSummary.java
@@ -255,6 +255,14 @@ public class MutationSummary
return count;
}
+ public int reconciledIds()
+ {
+ int count = 0;
+ for (CoordinatorSummary summary : summaries)
+ count += summary.reconciled.offsetCount();
+ return count;
+ }
+
public int size()
{
return summaries.size();
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 01cc05274d..0237c19550 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -18,30 +18,44 @@
package org.apache.cassandra.replication;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.reads.tracked.ReadReconciliations;
import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL;
+
// TODO (expected): persistence (handle restarts)
// TODO (expected): handle topology changes
public class MutationTrackingService
@@ -49,6 +63,7 @@ public class MutationTrackingService
private static final Logger logger =
LoggerFactory.getLogger(MutationTrackingService.class);
public static final MutationTrackingService instance = new
MutationTrackingService();
+ private final ReplicatedOffsetsBroadcaster broadcaster = new
ReplicatedOffsetsBroadcaster();
private final ReadReconciliations reconciliations = new
ReadReconciliations();
private final ConcurrentHashMap<String, KeyspaceShards> shards = new
ConcurrentHashMap<>();
@@ -67,6 +82,9 @@ public class MutationTrackingService
for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
if (keyspace.useMutationTracking())
shards.put(keyspace.name, KeyspaceShards.make(keyspace,
metadata, this::nextHostLogId));
+
+ broadcaster.start();
+
started = true;
}
@@ -92,10 +110,15 @@ public class MutationTrackingService
return id;
}
- public void witnessedRemoteMutation(String keyspace, Token token,
MutationId mutationId, InetAddressAndPort onHost)
+ public void receivedWriteResponse(String keyspace, Token token, MutationId
mutationId, InetAddressAndPort onHost)
{
Preconditions.checkArgument(!mutationId.isNone());
- getOrCreate(keyspace).witnessedRemoteMutation(token, mutationId,
onHost);
+ getOrCreate(keyspace).receivedWriteResponse(token, mutationId, onHost);
+ }
+
+ public void updateReplicatedOffsets(String keyspace, Range<Token> range,
List<Offsets> offsets, InetAddressAndPort onHost)
+ {
+ getOrCreate(keyspace).updateReplicatedOffsets(range, offsets, onHost);
}
public void startWriting(Mutation mutation)
@@ -125,6 +148,12 @@ public class MutationTrackingService
return createSummaryForRange(Range.makeRowRange(range), tableId,
includePending);
}
+ void forEachKeyspace(Consumer<KeyspaceShards> consumer)
+ {
+ for (KeyspaceShards keyspaceShards : shards.values())
+ consumer.accept(keyspaceShards);
+ }
+
private KeyspaceShards getOrCreate(TableId tableId)
{
//noinspection DataFlowIssue
@@ -161,11 +190,13 @@ public class MutationTrackingService
Preconditions.checkArgument(keyspace.params.replicationType.isTracked());
Map<Range<Token>, Shard> shards = new HashMap<>();
cluster.placements.get(keyspace.params.replication).writes.forEach((tokenRange,
forRange) -> {
- IntArrayList participants = new IntArrayList(forRange.size(),
IntArrayList.DEFAULT_NULL_VALUE);
- for (InetAddressAndPort endpoint : forRange.endpoints())
- participants.add(cluster.directory.peerId(endpoint).id());
- Shard shard = new Shard(keyspace.name, tokenRange,
cluster.myNodeId().id(), new Participants(participants),
forRange.lastModified(), logIdProvider);
- shards.put(tokenRange, shard);
+ if
(!forRange.endpoints().contains(FBUtilities.getBroadcastAddressAndPort()))
+ return;
+ IntArrayList participants = new IntArrayList(forRange.size(),
IntArrayList.DEFAULT_NULL_VALUE);
+ for (InetAddressAndPort endpoint : forRange.endpoints())
+ participants.add(cluster.directory.peerId(endpoint).id());
+ Shard shard = new Shard(keyspace.name, tokenRange,
cluster.myNodeId().id(), new Participants(participants),
forRange.lastModified(), logIdProvider);
+ shards.put(tokenRange, shard);
});
return new KeyspaceShards(keyspace.name, shards);
}
@@ -184,9 +215,14 @@ public class MutationTrackingService
return lookUp(token).nextId();
}
- void witnessedRemoteMutation(Token token, MutationId mutationId,
InetAddressAndPort onHost)
+ void receivedWriteResponse(Token token, MutationId mutationId,
InetAddressAndPort onHost)
+ {
+ lookUp(token).receivedWriteResponse(mutationId, onHost);
+ }
+
+ void updateReplicatedOffsets(Range<Token> range, List<Offsets>
offsets, InetAddressAndPort onHost)
{
- lookUp(token).witnessedRemoteMutation(mutationId, onHost);
+ shards.get(range).updateReplicatedOffsets(offsets, onHost);
}
void startWriting(Mutation mutation)
@@ -224,6 +260,12 @@ public class MutationTrackingService
});
}
+ void forEachShard(Consumer<Shard> consumer)
+ {
+ for (Shard shard : shards.values())
+ consumer.accept(shard);
+ }
+
Shard lookUp(Mutation mutation)
{
return lookUp(mutation.key());
@@ -242,4 +284,73 @@ public class MutationTrackingService
return shards.get(range);
}
}
+
+ // TODO (later): a more intelligent heuristic for offsets included in
broadcasts
+ private static class ReplicatedOffsetsBroadcaster implements Runnable,
Shutdownable
+ {
+ private static final ScheduledExecutorPlus executor =
+ executorFactory().scheduled("Replicated-Offsets-Broadcaster",
NORMAL);
+
+ // TODO (later): a more intelligent heuristic for scheduling broadcasts
+ private static final long BROADCAST_INTERVAL_MILLIS = 200;
+
+ void start()
+ {
+ executor.scheduleWithFixedDelay(this, BROADCAST_INTERVAL_MILLIS,
BROADCAST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return executor.isTerminated();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ @Override
+ public Object shutdownNow()
+ {
+ return executor.shutdownNow();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
+ {
+ return executor.awaitTermination(timeout, units);
+ }
+
+ @Override
+ public void run()
+ {
+ MutationTrackingService.instance.forEachKeyspace(this::run);
+ }
+
+ private void run(KeyspaceShards shards)
+ {
+ shards.forEachShard(this::run);
+ }
+
+ private void run(Shard shard)
+ {
+ ShardReplicatedOffsets replicatedOffsets =
shard.collectReplicatedOffsets();
+ if (replicatedOffsets.isEmpty())
+ return;
+
+ Message<ShardReplicatedOffsets> message =
Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets);
+
+ for (InetAddressAndPort target : shard.remoteReplicas())
+ if (FailureDetector.instance.isAlive(target))
+ MessagingService.instance().send(message, target);
+ }
+ }
+
+ @VisibleForTesting
+ public void broadcastOffsetsForTesting()
+ {
+ broadcaster.run();
+ }
}
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java
b/src/java/org/apache/cassandra/replication/Offsets.java
index 70985dcc8b..98a00b39fa 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -309,6 +309,7 @@ public class Offsets
if (size == 0)
{
append(start, end);
+ onAdded.consume(logId, start, end);
return true;
}
@@ -488,24 +489,6 @@ public class Offsets
bounds[size++] = end;
}
- public void append(int offset)
- {
- if (size == 0)
- {
- append(offset, offset);
- return;
- }
-
- int tail = bounds[size - 1];
- if (offset <= tail)
- throw new IllegalArgumentException("Can't append " + offset + " to
" + tail);
-
- if (offset == tail + 1)
- bounds[size-1] = offset;
- else
- append(offset, offset);
- }
-
public RangeIterator rangeIterator()
{
int numRanges = rangeCount();
@@ -791,7 +774,7 @@ public class Offsets
case VALID:
state = computeNext();
if (!state.isFinished())
- break;;
+ break;
case FINISHED:
return false;
default:
diff --git a/src/java/org/apache/cassandra/replication/Participants.java
b/src/java/org/apache/cassandra/replication/Participants.java
index c665d63a99..494dd3adca 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/Participants.java
@@ -46,6 +46,11 @@ public class Participants
return idx;
}
+ boolean contains(int hostId)
+ {
+ return indexOf(hostId) >= 0;
+ }
+
int get(int idx)
{
if (idx < 0 || idx >= hosts.length)
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 72ff1119ca..cb110249a0 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.replication;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.IntSupplier;
import com.google.common.base.Preconditions;
@@ -30,6 +32,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.jctools.maps.NonBlockingHashMapLong;
public class Shard
@@ -45,6 +48,8 @@ public class Shard
Shard(String keyspace, Range<Token> tokenRange, int localHostId,
Participants participants, Epoch sinceEpoch, IntSupplier logIdProvider)
{
+ Preconditions.checkArgument(participants.contains(localHostId));
+
this.keyspace = keyspace;
this.tokenRange = tokenRange;
this.localHostId = localHostId;
@@ -62,20 +67,27 @@ public class Shard
return currentLocalLog.nextId();
}
- void witnessedRemoteMutation(MutationId mutationId, InetAddressAndPort
onHost)
+ void receivedWriteResponse(MutationId mutationId, InetAddressAndPort
onHost)
+ {
+ int onHostId = ClusterMetadata.current().directory.peerId(onHost).id();
+ getOrCreate(mutationId).receivedWriteResponse(mutationId, onHostId);
+ }
+
+ void updateReplicatedOffsets(List<Offsets> offsets, InetAddressAndPort
onHost)
{
int onHostId = ClusterMetadata.current().directory.peerId(onHost).id();
- get(mutationId).witnessedRemoteMutation(mutationId, onHostId);
+ for (Offsets logOffsets : offsets)
+
getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, onHostId);
}
void startWriting(Mutation mutation)
{
- get(mutation.id()).startWriting(mutation);
+ getOrCreate(mutation.id()).startWriting(mutation);
}
void finishWriting(Mutation mutation)
{
- get(mutation.id()).finishWriting(mutation);
+ getOrCreate(mutation.id()).finishWriting(mutation);
}
void addSummaryForKey(Token token, boolean includePending,
MutationSummary.Builder builder)
@@ -94,6 +106,34 @@ public class Shard
});
}
+ List<InetAddressAndPort> remoteReplicas()
+ {
+ List<InetAddressAndPort> replicas = new
ArrayList<>(participants.size() - 1);
+ for (int i = 0, size = participants.size(); i < size; ++i)
+ {
+ int hostId = participants.get(i);
+ if (hostId != localHostId)
+ replicas.add(ClusterMetadata.current().directory.endpoint(new
NodeId(hostId)));
+ }
+ return replicas;
+ }
+
+ /**
+ * Collects replicated offsets for the logs owned by this coordinator on
this shard.
+ */
+ ShardReplicatedOffsets collectReplicatedOffsets()
+ {
+ List<Offsets> offsets = new ArrayList<>();
+ for (CoordinatorLog log : logs.values())
+ {
+ Offsets logOffsets = log.collectReplicatedOffsets();
+ if (logOffsets != null)
+ offsets.add(logOffsets);
+ }
+
+ return new ShardReplicatedOffsets(keyspace, tokenRange, offsets);
+ }
+
/**
* Creates a new coordinator log for this host. Primarily on Shard init
(node startup or topology change).
* Also on keyspace creation.
@@ -104,13 +144,18 @@ public class Shard
return new CoordinatorLog.CoordinatorLogPrimary(localHostId, logId,
participants);
}
- private CoordinatorLog get(MutationId mutationId)
+ private CoordinatorLog getOrCreate(MutationId mutationId)
{
Preconditions.checkArgument(!mutationId.isNone());
- return get(mutationId.logId());
+ return getOrCreate(mutationId.logId());
+ }
+
+ private CoordinatorLog getOrCreate(CoordinatorLogId logId)
+ {
+ return getOrCreate(logId.asLong());
}
- private CoordinatorLog get(long logId)
+ private CoordinatorLog getOrCreate(long logId)
{
CoordinatorLog log = logs.get(logId);
return log != null
diff --git
a/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java
b/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java
new file mode 100644
index 0000000000..7795d3abfc
--- /dev/null
+++ b/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.IVerbHandler;
+
+public class ShardReplicatedOffsets
+{
+ private static final Logger logger =
LoggerFactory.getLogger(ShardReplicatedOffsets.class);
+
+ private final String keyspace;
+ private final Range<Token> range;
+ private final List<Offsets> replicatedOffsets;
+
+ public ShardReplicatedOffsets(String keyspace, Range<Token> range,
List<Offsets> offsets)
+ {
+ this.keyspace = keyspace;
+ this.range = range;
+ this.replicatedOffsets = offsets;
+ }
+
+ boolean isEmpty()
+ {
+ return replicatedOffsets.isEmpty();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " +
replicatedOffsets + '}';
+ }
+
+ public static final IVerbHandler<ShardReplicatedOffsets> verbHandler =
message -> {
+ ShardReplicatedOffsets replicatedOffsets = message.payload;
+ logger.trace("Received replicated offsets {} from {}",
replicatedOffsets, message.from());
+
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
+
replicatedOffsets.range,
+
replicatedOffsets.replicatedOffsets,
+
message.from());
+ };
+
+ public static final IVersionedSerializer<ShardReplicatedOffsets>
serializer = new IVersionedSerializer<>()
+ {
+ @Override
+ public void serialize(ShardReplicatedOffsets status, DataOutputPlus
out, int version) throws IOException
+ {
+ out.writeUTF(status.keyspace);
+ AbstractBounds.tokenSerializer.serialize(status.range, out,
version);
+ out.writeInt(status.replicatedOffsets.size());
+ for (Offsets logOffsets : status.replicatedOffsets)
+ Offsets.serializer.serialize(logOffsets, out, version);
+ }
+
+ @Override
+ public ShardReplicatedOffsets deserialize(DataInputPlus in, int
version) throws IOException
+ {
+ String keyspace = in.readUTF();
+ Range<Token> range = (Range<Token>)
AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version);
+ int count = in.readInt();
+ List<Offsets> replicatedOffsets = new ArrayList<>(count);
+ for (int i = 0; i < count; ++i)
+ replicatedOffsets.add(Offsets.serializer.deserialize(in,
version));
+ return new ShardReplicatedOffsets(keyspace, range,
replicatedOffsets);
+ }
+
+ @Override
+ public long serializedSize(ShardReplicatedOffsets replicatedOffsets,
int version)
+ {
+ long size = 0;
+ size += TypeSizes.sizeof(replicatedOffsets.keyspace);
+ size +=
AbstractBounds.tokenSerializer.serializedSize(replicatedOffsets.range, version);
+ size +=
TypeSizes.sizeof(replicatedOffsets.replicatedOffsets.size());
+ for (Offsets logOffsets : replicatedOffsets.replicatedOffsets)
+ size += Offsets.serializer.serializedSize(logOffsets, version);
+ return size;
+ }
+ };
+}
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index 7ae7b35e77..ba58f4f1f7 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -168,10 +168,10 @@ public class TrackedWriteRequest
if (remoteDCReplicas == null)
remoteDCReplicas = new HashMap<>();
- List<Replica> messages = remoteDCReplicas.get(dc);
- if (messages == null)
- messages = remoteDCReplicas.computeIfAbsent(dc, ignore ->
new ArrayList<>(3)); // most DCs will have <= 3 replicas
- messages.add(destination);
+ List<Replica> replicas = remoteDCReplicas.get(dc);
+ if (replicas == null)
+ replicas = remoteDCReplicas.computeIfAbsent(dc, ignore ->
new ArrayList<>(3)); // most DCs will have <= 3 replicas
+ replicas.add(destination);
}
}
@@ -179,8 +179,8 @@ public class TrackedWriteRequest
applyMutationLocally(mutation, handler);
if (localDCReplicas != null)
- for (Replica destination : localDCReplicas)
- MessagingService.instance().sendWriteWithCallback(message,
destination, handler);
+ for (Replica replica : localDCReplicas)
+ MessagingService.instance().sendWriteWithCallback(message,
replica, handler);
if (remoteDCReplicas != null)
{
diff --git a/src/java/org/apache/cassandra/replication/Participants.java
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
similarity index 51%
copy from src/java/org/apache/cassandra/replication/Participants.java
copy to src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index c665d63a99..af26a9b19c 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -17,39 +17,25 @@
*/
package org.apache.cassandra.replication;
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.TableId;
-public class Participants
+/**
+ * Tracks unreconciled local mutations - the subset of all unreconciled
mutations
+ * that have been witnessed, or are currently being written to, on the local
node.
+ */
+interface UnreconciledMutations
{
- private final int[] hosts;
+ void startWriting(Mutation mutation);
- Participants(Collection<Integer> participants)
- {
- int i = 0;
- int[] hosts = new int[participants.size()];
- for (int host : participants) hosts[i++] = host;
- Arrays.sort(hosts);
- this.hosts = hosts;
- }
+ void finishWriting(Mutation mutation);
- int size()
- {
- return hosts.length;
- }
+ void remove(int offset);
- int indexOf(int hostId)
- {
- int idx = Arrays.binarySearch(hosts, hostId);
- if (idx < 0)
- throw new IllegalArgumentException("Unknown host id " + hostId);
- return idx;
- }
+ boolean collect(Token token, TableId tableId, boolean includePending,
Offsets into);
- int get(int idx)
- {
- if (idx < 0 || idx >= hosts.length)
- throw new IllegalArgumentException("Out of bounds host idx " +
idx);
- return hosts[idx];
- }
+ boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId,
boolean includePending, Offsets into);
}
diff --git a/src/java/org/apache/cassandra/replication/LocalMutationStates.java
b/src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java
similarity index 92%
rename from src/java/org/apache/cassandra/replication/LocalMutationStates.java
rename to
src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java
index b3f8a2d347..ab7c47ed50 100644
--- a/src/java/org/apache/cassandra/replication/LocalMutationStates.java
+++
b/src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.schema.TableId;
* Tracks unreconciled local mutations - the subset of all unreconciled
mutations
* that have been witnessed, or are currently being written to, on the local
node.
*/
-class LocalMutationStates
+class UnreconciledMutationsReplica implements UnreconciledMutations
{
private final Int2ObjectHashMap<Entry> statesMap = new
Int2ObjectHashMap<>();
private final SortedSet<Entry> statesSet = new TreeSet<>(Entry.comparator);
@@ -115,14 +115,16 @@ class LocalMutationStates
}
}
- void startWriting(Mutation mutation)
+ @Override
+ public void startWriting(Mutation mutation)
{
Entry entry = Entry.create(mutation);
statesMap.put(entry.offset, entry);
statesSet.add(entry);
}
- void finishWriting(Mutation mutation)
+ @Override
+ public void finishWriting(Mutation mutation)
{
Preconditions.checkArgument(!mutation.id().isNone());
Entry entry = statesMap.get(mutation.id().offset());
@@ -130,20 +132,23 @@ class LocalMutationStates
entry.visibility = Visibility.VISIBLE;
}
- void remove(int offset)
+ @Override
+ public void remove(int offset)
{
Entry state = statesMap.remove(offset);
Preconditions.checkNotNull(state);
statesSet.remove(state);
}
- boolean collect(Token token, TableId tableId, boolean includePending,
Offsets into)
+ @Override
+ public boolean collect(Token token, TableId tableId, boolean
includePending, Offsets into)
{
SortedSet<Entry> subset = statesSet.subSet(Entry.start(token, true),
Entry.end(token, true));
return collect(subset, tableId, includePending, into);
}
- boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId,
boolean includePending, Offsets into)
+ @Override
+ public boolean collect(AbstractBounds<PartitionPosition> range, TableId
tableId, boolean includePending, Offsets into)
{
Entry start = Entry.start(range.left.getToken(), range.left.kind() !=
PartitionPosition.Kind.MAX_BOUND);
Entry end = Entry.end(range.right.getToken(), range.right.kind() !=
PartitionPosition.Kind.MIN_BOUND);
diff --git
a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
index 56528f89c8..c3eb76a46d 100644
--- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
@@ -56,7 +56,7 @@ public class TrackedWriteResponseHandler extends
AbstractWriteResponseHandler<No
{
// Local mutations are witnessed from Keyspace.applyInternalTracked
if (msg != null)
- MutationTrackingService.instance.witnessedRemoteMutation(keyspace,
token, mutationId, msg.from());
+ MutationTrackingService.instance.receivedWriteResponse(keyspace,
token, mutationId, msg.from());
wrapped.onResponse(msg);
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
index e5c63c342c..48ff51b814 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
@@ -68,7 +68,7 @@ public class ReadReconcileNotify
}
};
- public static final IVersionedSerializer<ReadReconcileNotify> serializer =
new IVersionedSerializer<ReadReconcileNotify>()
+ public static final IVersionedSerializer<ReadReconcileNotify> serializer =
new IVersionedSerializer<>()
{
@Override
public void serialize(ReadReconcileNotify notify, DataOutputPlus out,
int version) throws IOException
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
index 8ba7be954a..85bb0fbfee 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.utils.Clock;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-import static
org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.DISCARD;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL;
import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
/**
@@ -51,7 +51,7 @@ public class ReadReconciliations implements Shutdownable
private static final Logger logger =
LoggerFactory.getLogger(ReadReconciliations.class);
private final ConcurrentMap<Long, Info> reconciliations = new
ConcurrentHashMap<>();
- private final ScheduledExecutorPlus executor =
executorFactory().scheduled("Reconciliation-Map-Reaper", DISCARD);
+ private final ScheduledExecutorPlus executor =
executorFactory().scheduled("Reconciliation-Map-Reaper", NORMAL);
private static final AtomicLong lastReconciliationId = new AtomicLong();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
new file mode 100644
index 0000000000..5cd9c8cb2c
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.tracking;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.replication.MutationSummary;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.getOnlyLogId;
+
+public class OffsetBroadcastTest extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(OffsetBroadcastTest.class);
+
+ @Test
+ public void testBroadcastOffsets() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("mutation_tracking_enabled", "true"))
+ .start())
+ {
+
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH
replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3} " +
+ "AND
replication_type='tracked';"));
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int
primary key, v int);"));
+
+ String keyspaceName = KEYSPACE;
+
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl
(k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM);
+
+ for (int i = 1; i <= cluster.size(); ++i)
+ cluster.get(i).runOnInstance(() ->
MutationTrackingService.instance.broadcastOffsetsForTesting());
+
+ for (int i = 1; i <= cluster.size(); ++i)
+ {
+ cluster.get(i).runOnInstance(() -> {
+ TableMetadata table =
Schema.instance.getTableMetadata(keyspaceName, "tbl");
+ DecoratedKey dk =
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+ MutationSummary summary =
MutationTrackingService.instance.createSummaryForKey(dk, table.id, false);
+ MutationSummary.CoordinatorSummary coordinatorSummary =
summary.get(getOnlyLogId(summary));
+ Assert.assertEquals(1,
coordinatorSummary.reconciled.offsetCount());
+ Assert.assertEquals(0,
coordinatorSummary.unreconciled.offsetCount());
+ });
+ }
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
index 72302634c9..64172b37e7 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
@@ -68,7 +68,7 @@ public class CoordinatorLogTest
{
Offsets list = new Offsets(LOG_ID);
for (MutationId id : ids)
- list.append(id.offset());
+ list.add(id.offset());
return list;
}
@@ -119,10 +119,10 @@ public class CoordinatorLogTest
// the call to finishWriting will have made the ids visible without
the includePending flag
assertUnreconciled(tk, tableId, log, false, reconciled, ids);
- log.witnessedRemoteMutation(ids[0], PARTICIPANTS.get(1));
+ log.receivedWriteResponse(ids[0], PARTICIPANTS.get(1));
assertUnreconciled(tk, tableId, log, false, reconciled, ids);
- log.witnessedRemoteMutation(ids[0], PARTICIPANTS.get(2));
+ log.receivedWriteResponse(ids[0], PARTICIPANTS.get(2));
reconciled.add(ids[0].offset());
assertUnreconciled(tk, tableId, log, false, reconciled, ids[1],
ids[2]);
}
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index 9b39491b3d..0a94a2a646 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -504,22 +504,22 @@ public class OffsetsTest
public void appendTest()
{
Offsets ids = new Offsets(LOG_ID);
- ids.append(5);
+ ids.add(5);
assertEquals(1, ids.rangeCount());
assertEquals(1, ids.offsetCount());
- ids.append(6);
+ ids.add(6);
assertEquals(1, ids.rangeCount());
assertEquals(2, ids.offsetCount());
- ids.append(8);
+ ids.add(8);
assertEquals(2, ids.rangeCount());
assertEquals(3, ids.offsetCount());
// insert before tail
try
{
- ids.append(8);
+ ids.add(8);
Assert.fail();
}
catch (IllegalArgumentException e)
@@ -532,7 +532,7 @@ public class OffsetsTest
// insert before tail
try
{
- ids.append(7);
+ ids.add(7);
Assert.fail();
}
catch (IllegalArgumentException e)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]