This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new bd5a657548 Implement Shard and CoordinatorLog metadata durability
bd5a657548 is described below
commit bd5a657548f27a0b6a5f08f1452a39b4f14064cb
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Thu Jul 17 13:00:33 2025 +0100
Implement Shard and CoordinatorLog metadata durability
patch by Aleksey Yeschenko; reviewed by Abe Ratnofsky for CASSANDRA-20882
---
.../org/apache/cassandra/db/SystemKeyspace.java | 80 ++++--
.../cassandra/io/sstable/format/SSTableWriter.java | 2 +-
.../cassandra/replication/BroadcastLogOffsets.java | 12 +-
.../cassandra/replication/CoordinatorLog.java | 304 ++++++++++++++++-----
.../replication/MutationTrackingService.java | 227 ++++++++-------
.../cassandra/replication/Node2OffsetsMap.java | 89 +++++-
.../org/apache/cassandra/replication/Offsets.java | 33 +++
.../apache/cassandra/replication/Participants.java | 20 ++
.../org/apache/cassandra/replication/Shard.java | 188 ++++++++-----
.../replication/UnreconciledMutations.java | 38 +++
.../test/log/SystemKeyspaceStorageTest.java | 4 +-
.../cassandra/cql3/SystemKeyspaceQueryTest.java | 4 +-
.../replication/CoordinatorLogOffsetsTest.java | 8 +-
.../cassandra/replication/CoordinatorLogTest.java | 123 +++++++--
.../apache/cassandra/replication/OffsetsTest.java | 12 +-
.../apache/cassandra/replication/ShardTest.java | 112 ++++++++
.../replication/UnreconciledMutationsTest.java | 115 ++++----
17 files changed, 1043 insertions(+), 328 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index a15b46665f..aa3834c4ff 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,7 +188,10 @@ public final class SystemKeyspace
public static final String REPAIRS = "repairs";
public static final String TOP_PARTITIONS = "top_partitions";
public static final String METADATA_LOG = "local_metadata_log";
- public static final String SNAPSHOT_TABLE_NAME = "metadata_snapshots";
+ public static final String METADATA_SNAPSHOTS = "metadata_snapshots";
+ public static final String HOST_LOG_ID = "host_log_id";
+ public static final String SHARDS = "shards";
+ public static final String COORDINATOR_LOGS = "coordinator_logs";
/**
* By default the system keyspace tables should be stored in a single data
directory to allow the server
@@ -223,14 +226,16 @@ public final class SystemKeyspace
TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY, AVAILABLE_RANGES_V2,
TRANSFERRED_RANGES_V2, VIEW_BUILDS_IN_PROGRESS,
BUILT_VIEWS, PREPARED_STATEMENTS, REPAIRS, TOP_PARTITIONS,
LEGACY_PEERS, LEGACY_PEER_EVENTS,
LEGACY_TRANSFERRED_RANGES, LEGACY_AVAILABLE_RANGES,
LEGACY_SIZE_ESTIMATES, LEGACY_SSTABLE_ACTIVITY,
- METADATA_LOG, SNAPSHOT_TABLE_NAME, CONSENSUS_MIGRATION_STATE);
+ METADATA_LOG, METADATA_SNAPSHOTS, CONSENSUS_MIGRATION_STATE,
+ HOST_LOG_ID, SHARDS, COORDINATOR_LOGS);
public static final Set<String> TABLE_NAMES = ImmutableSet.of(
BATCHES, PAXOS, PAXOS_REPAIR_HISTORY, BUILT_INDEXES, LOCAL, PEERS_V2,
PEER_EVENTS_V2,
COMPACTION_HISTORY, SSTABLE_ACTIVITY_V2, TABLE_ESTIMATES,
AVAILABLE_RANGES_V2, TRANSFERRED_RANGES_V2, VIEW_BUILDS_IN_PROGRESS,
BUILT_VIEWS, PREPARED_STATEMENTS, REPAIRS, TOP_PARTITIONS,
LEGACY_PEERS, LEGACY_PEER_EVENTS,
LEGACY_TRANSFERRED_RANGES, LEGACY_AVAILABLE_RANGES,
LEGACY_SIZE_ESTIMATES, LEGACY_SSTABLE_ACTIVITY,
- METADATA_LOG, SNAPSHOT_TABLE_NAME, CONSENSUS_MIGRATION_STATE);
+ METADATA_LOG, METADATA_SNAPSHOTS, CONSENSUS_MIGRATION_STATE,
+ HOST_LOG_ID, SHARDS, COORDINATOR_LOGS);
public static final TableMetadata Batches =
parse(BATCHES,
@@ -516,7 +521,7 @@ public final class SystemKeyspace
+ "cfids set<uuid>, "
+ "PRIMARY KEY (parent_id))").build();
- public static final TableMetadata LocalMetadataLog =
+ private static final TableMetadata LocalMetadataLog =
parse(METADATA_LOG,
"Local Metadata Log",
"CREATE TABLE %s ("
@@ -530,13 +535,49 @@ public final class SystemKeyspace
"compaction_window_size","1")))
.build();
- public static final TableMetadata Snapshots = parse(SNAPSHOT_TABLE_NAME,
- "ClusterMetadata
snapshots",
- "CREATE TABLE IF NOT
EXISTS %s (" +
- "epoch bigint PRIMARY
KEY," +
- "snapshot blob)")
-
.partitioner(MetaStrategy.partitioner)
- .build();
+ private static final TableMetadata Snapshots =
+ parse(METADATA_SNAPSHOTS,
+ "ClusterMetadata snapshots",
+ "CREATE TABLE IF NOT EXISTS %s (" +
+ "epoch bigint PRIMARY KEY," +
+ "snapshot blob)")
+ .partitioner(MetaStrategy.partitioner)
+ .build();
+
+ private static final TableMetadata HostLogId =
+ parse(HOST_LOG_ID,
+ "mutation tracking last used host log id",
+ "CREATE TABLE %s ("
+ + "key text,"
+ + "host_log_id int,"
+ + "PRIMARY KEY ((key)))")
+ .build();
+
+ private static final TableMetadata Shards =
+ parse(SHARDS,
+ "mutation tracking shards",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "range_start text,"
+ + "range_end text,"
+ + "participants frozen<set<int>>,"
+ + "PRIMARY KEY ((keyspace_name, range_start, range_end)))")
+ .build();
+
+ private static final TableMetadata CoordinatorLogs =
+ parse(COORDINATOR_LOGS,
+ "mutation tracking coordinator logs",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "range_start text,"
+ + "range_end text,"
+ + "host_id int,"
+ + "host_log_id int,"
+ + "participants frozen<set<int>>,"
+ + "witnessed_offsets map<int, frozen<list<int>>>,"
+ + "persisted_offsets map<int, frozen<list<int>>>,"
+ + "PRIMARY KEY ((keyspace_name, range_start, range_end),
host_id, host_log_id))")
+ .build();
@Deprecated(since = "4.0")
private static final TableMetadata LegacyPeers =
@@ -631,7 +672,10 @@ public final class SystemKeyspace
TopPartitions,
LocalMetadataLog,
Snapshots,
- ConsensusMigrationState);
+ ConsensusMigrationState,
+ HostLogId,
+ Shards,
+ CoordinatorLogs);
}
private static volatile Map<TableId, Pair<CommitLogPosition, Long>>
truncationRecords;
@@ -2127,16 +2171,16 @@ public final class SystemKeyspace
{
Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot store
a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
logger.info("Storing snapshot of cluster metadata at epoch {}", epoch);
- String query = String.format("INSERT INTO %s.%s (epoch, snapshot)
VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+ String query = String.format("INSERT INTO %s.%s (epoch, snapshot)
VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
executeInternal(query, epoch.getEpoch(), snapshot);
- forceBlockingFlush(SNAPSHOT_TABLE_NAME);
+ forceBlockingFlush(METADATA_SNAPSHOTS);
}
public static ByteBuffer getSnapshot(Epoch epoch)
{
Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot
retrieve a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
logger.info("Getting snapshot of epoch = {}", epoch);
- String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch =
?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+ String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch =
?", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query, epoch.getEpoch());
if (res == null || res.isEmpty())
return null;
@@ -2165,7 +2209,7 @@ public final class SystemKeyspace
{
// during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the
reverse partitioner doesn't support negative keys)
search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
- String query = String.format("SELECT snapshot FROM %s.%s WHERE
token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME,
SNAPSHOT_TABLE_NAME);
+ String query = String.format("SELECT snapshot FROM %s.%s WHERE
token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME,
METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query, search.getEpoch());
if (res != null && !res.isEmpty())
return res.one().getBytes("snapshot").duplicate();
@@ -2176,7 +2220,7 @@ public final class SystemKeyspace
{
// during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the
reverse partitioner doesn't support negative keys)
search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
- String query = String.format("SELECT epoch FROM %s.%s WHERE
token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME,
SNAPSHOT_TABLE_NAME);
+ String query = String.format("SELECT epoch FROM %s.%s WHERE
token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME,
METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query, search.getEpoch());
if (res == null)
return Collections.emptyList();
@@ -2189,7 +2233,7 @@ public final class SystemKeyspace
*/
public static ByteBuffer findLastSnapshot()
{
- String query = String.format("SELECT snapshot FROM %s.%s LIMIT 1",
SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+ String query = String.format("SELECT snapshot FROM %s.%s LIMIT 1",
SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query);
if (res != null && !res.isEmpty())
return res.one().getBytes("snapshot").duplicate();
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 0190282fc7..bc183a56fa 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -343,7 +343,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
if (metadata().replicationType().isTracked() && repairedAt ==
ActiveRepairService.UNREPAIRED_SSTABLE)
{
Preconditions.checkState(Objects.equals(pendingRepair,
ActiveRepairService.NO_PENDING_REPAIR));
- if
(MutationTrackingService.instance.isDurablyReconciled(getKeyspaceName(),
coordinatorLogOffsets))
+ if
(MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets))
{
repairedAt = Clock.Global.currentTimeMillis();
logger.debug("Marking SSTable {} as reconciled with repairedAt
{}", descriptor, repairedAt);
diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
index b3fc118613..ce8fe512c9 100644
--- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
+++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
@@ -41,12 +41,14 @@ public class BroadcastLogOffsets
private final String keyspace;
private final Range<Token> range;
private final List<Offsets.Immutable> replicatedOffsets;
+ private final boolean durable;
- public BroadcastLogOffsets(String keyspace, Range<Token> range,
List<Offsets.Immutable> offsets)
+ public BroadcastLogOffsets(String keyspace, Range<Token> range,
List<Offsets.Immutable> offsets, boolean durable)
{
this.keyspace = keyspace;
this.range = range;
this.replicatedOffsets = offsets;
+ this.durable = durable;
}
boolean isEmpty()
@@ -57,7 +59,7 @@ public class BroadcastLogOffsets
@Override
public String toString()
{
- return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " +
replicatedOffsets + '}';
+ return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " +
replicatedOffsets + ", " + durable + '}';
}
public static final IVerbHandler<BroadcastLogOffsets> verbHandler =
message -> {
@@ -66,6 +68,7 @@ public class BroadcastLogOffsets
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
replicatedOffsets.range,
replicatedOffsets.replicatedOffsets,
+
replicatedOffsets.durable,
message.from());
};
@@ -79,6 +82,7 @@ public class BroadcastLogOffsets
out.writeInt(status.replicatedOffsets.size());
for (Offsets.Immutable logOffsets : status.replicatedOffsets)
Offsets.serializer.serialize(logOffsets, out, version);
+ out.writeBoolean(status.durable);
}
@Override
@@ -90,7 +94,8 @@ public class BroadcastLogOffsets
List<Offsets.Immutable> replicatedOffsets = new ArrayList<>(count);
for (int i = 0; i < count; ++i)
replicatedOffsets.add(Offsets.serializer.deserialize(in,
version));
- return new BroadcastLogOffsets(keyspace, range, replicatedOffsets);
+ boolean durable = in.readBoolean();
+ return new BroadcastLogOffsets(keyspace, range, replicatedOffsets,
durable);
}
@Override
@@ -102,6 +107,7 @@ public class BroadcastLogOffsets
size +=
TypeSizes.sizeof(replicatedOffsets.replicatedOffsets.size());
for (Offsets.Immutable logOffsets :
replicatedOffsets.replicatedOffsets)
size += Offsets.serializer.serializedSize(logOffsets, version);
+ size += TypeSizes.sizeof(replicatedOffsets.durable);
return size;
}
};
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 13228256e2..312c355237 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -17,7 +17,11 @@
*/
package org.apache.cassandra.replication;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -27,74 +31,101 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tcm.ClusterMetadata;
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.replication.Node2OffsetsMap.forParticipants;
+import static
org.apache.cassandra.replication.Node2OffsetsMap.fromPrimitiveMap;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
public abstract class CoordinatorLog
{
private static final Logger logger =
LoggerFactory.getLogger(CoordinatorLog.class);
- protected final int localHostId;
+ protected final int localNodeId;
+ protected final String keyspace;
+ protected final Range<Token> range;
protected final CoordinatorLogId logId;
protected final Participants participants;
+ protected final Node2OffsetsMap witnessedOffsets;
+ protected final Node2OffsetsMap persistedOffsets;
+
protected final UnreconciledMutations unreconciledMutations;
- protected final Offsets.Mutable[] witnessedOffsets;
protected final Offsets.Mutable reconciledOffsets;
+ protected final Offsets.Mutable reconciledPersistedOffsets;
protected final ReadWriteLock lock;
- abstract void receivedWriteResponse(ShortMutationId mutationId, int
fromHostId);
+ abstract void receivedWriteResponse(ShortMutationId mutationId, int
fromNodeId);
- CoordinatorLog(int localHostId, CoordinatorLogId logId, Participants
participants)
+ CoordinatorLog(String keyspace,
+ Range<Token> range,
+ int localNodeId,
+ CoordinatorLogId logId,
+ Participants participants,
+ Node2OffsetsMap witnessedOffsets,
+ Node2OffsetsMap persistedOffsets,
+ UnreconciledMutations unreconciledMutations)
{
- this.localHostId = localHostId;
+ this.localNodeId = localNodeId;
+ this.keyspace = keyspace;
+ this.range = range;
this.logId = logId;
this.participants = participants;
-
- this.unreconciledMutations = new UnreconciledMutations();
+ this.unreconciledMutations = unreconciledMutations;
+ this.witnessedOffsets = witnessedOffsets;
+ this.reconciledOffsets = witnessedOffsets.intersection();
+ this.persistedOffsets = persistedOffsets;
+ this.reconciledPersistedOffsets = persistedOffsets.intersection();
this.lock = new ReentrantReadWriteLock();
+ }
- Offsets.Mutable[] ids = new Offsets.Mutable[participants.size()];
- for (int i = 0; i < participants.size(); i++)
- ids[i] = new Offsets.Mutable(logId);
+ CoordinatorLog(String keyspace, Range<Token> range, int localNodeId,
CoordinatorLogId logId, Participants participants)
+ {
+ this(keyspace, range, localNodeId, logId, participants,
forParticipants(logId, participants), forParticipants(logId, participants), new
UnreconciledMutations());
+ }
- witnessedOffsets = ids;
- reconciledOffsets = new Offsets.Mutable(logId);
+ static CoordinatorLog create(String keyspace, Range<Token> range, int
localNodeId, CoordinatorLogId id, Participants participants)
+ {
+ return id.hostId == localNodeId ? new CoordinatorLogPrimary(keyspace,
range, localNodeId, id, participants)
+ : new CoordinatorLogReplica(keyspace,
range, localNodeId, id, participants);
}
- static CoordinatorLog create(int localHostId, CoordinatorLogId id,
Participants participants)
+ static CoordinatorLog recreate(
+ String keyspace, Range<Token> range, int localNodeId, CoordinatorLogId
id, Participants participants,
+ Node2OffsetsMap witnessedOffsets, Node2OffsetsMap persistedOffsets,
UnreconciledMutations unreconciledMutations)
{
- return id.hostId == localHostId ? new
CoordinatorLogPrimary(localHostId, id, participants)
- : new
CoordinatorLogReplica(localHostId, id, participants);
+ return id.hostId == localNodeId ? new CoordinatorLogPrimary(keyspace,
range, localNodeId, id, participants, witnessedOffsets, persistedOffsets,
unreconciledMutations)
+ : new CoordinatorLogReplica(keyspace,
range, localNodeId, id, participants, witnessedOffsets, persistedOffsets,
unreconciledMutations);
}
- void updateReplicatedOffsets(Offsets offsets, int onHostId)
+ void updateReplicatedOffsets(Offsets offsets, boolean persisted, int
onNodeId)
{
lock.writeLock().lock();
try
{
- get(onHostId).addAll(offsets, (ignore, start, end) ->
- {
- for (int offset = start; offset <= end; ++offset)
- {
- // TODO (desired): use the fact that Offsets are ordered
to optimise this look up
- if (othersWitnessed(offset, onHostId))
- {
- reconciledOffsets.add(offset);
- unreconciledMutations.remove(offset);
- }
- }
- });
+ if (persisted)
+ updatePersistedReplicatedOffsets(offsets, onNodeId);
+ else
+ updateWitnessedReplicatedOffsets(offsets, onNodeId);
}
finally
{
@@ -102,13 +133,37 @@ public abstract class CoordinatorLog
}
}
+ private void updateWitnessedReplicatedOffsets(Offsets offsets, int
onNodeId)
+ {
+ witnessedOffsets.get(onNodeId).addAll(offsets, (ignore, start, end) ->
+ {
+ for (int offset = start; offset <= end; ++offset)
+ {
+ // TODO (desired): use the fact that Offsets are ordered to
optimise this look up
+ if (othersWitnessed(offset, onNodeId))
+ {
+ reconciledOffsets.add(offset);
+ unreconciledMutations.remove(offset);
+ }
+ }
+ });
+ }
+
+ private void updatePersistedReplicatedOffsets(Offsets offsets, int
onNodeId)
+ {
+ persistedOffsets.get(onNodeId).addAll(offsets);
+ reconciledPersistedOffsets.addAll(persistedOffsets.intersection());
+ }
+
@Nullable
- Offsets.Immutable collectReplicatedOffsets()
+ Offsets.Immutable collectReplicatedOffsets(boolean persisted)
{
lock.readLock().lock();
try
{
- Offsets offsets =
witnessedOffsets[participants.indexOf(localHostId)];
+ Offsets offsets = persisted
+ ? persistedOffsets.get(localNodeId)
+ : witnessedOffsets.get(localNodeId);
return offsets.isEmpty() ? null : Offsets.Immutable.copy(offsets);
}
finally
@@ -122,7 +177,7 @@ public abstract class CoordinatorLog
lock.writeLock().lock();
try
{
- if (getLocal().contains(mutation.id().offset()))
+ if
(witnessedOffsets.get(localNodeId).contains(mutation.id().offset()))
return false; // already witnessed; shouldn't get to this path
often (duplicate mutation)
unreconciledMutations.startWriting(mutation);
@@ -143,7 +198,7 @@ public abstract class CoordinatorLog
{
int offset = mutation.id().offset();
// we've raced with another write, no need to do anything else
- if (!getLocal().add(offset))
+ if (!witnessedOffsets.get(localNodeId).add(offset))
return;
unreconciledMutations.finishWriting(mutation);
@@ -160,12 +215,12 @@ public abstract class CoordinatorLog
}
}
- private boolean othersWitnessed(int offset, int exceptHostId)
+ private boolean othersWitnessed(int offset, int exceptNodeId)
{
for (int i = 0; i < participants.size(); ++i)
{
- int hostId = participants.get(i);
- if (hostId != exceptHostId && !get(hostId).contains(offset))
+ int nodeId = participants.get(i);
+ if (nodeId != exceptNodeId &&
!witnessedOffsets.get(nodeId).contains(offset))
return false;
}
return true;
@@ -173,7 +228,7 @@ public abstract class CoordinatorLog
protected boolean remoteReplicasWitnessed(int offset)
{
- return othersWitnessed(offset, localHostId);
+ return othersWitnessed(offset, localNodeId);
}
/**
@@ -220,7 +275,7 @@ public abstract class CoordinatorLog
lock.readLock().lock();
try
{
- into.add(Offsets.Immutable.difference(remoteOffsets, getLocal()));
+ into.add(Offsets.Immutable.difference(remoteOffsets,
witnessedOffsets.get(localNodeId)));
}
finally
{
@@ -233,9 +288,11 @@ public abstract class CoordinatorLog
lock.readLock().lock();
try
{
- remoteNodeIds.forEachInt(remoteNodeId -> {
- Offsets missing =
Offsets.Immutable.difference(get(remoteNodeId), localOffsets);
- if (!missing.isEmpty()) into.add(remoteNodeId, missing);
+ remoteNodeIds.forEachInt(remoteNodeId ->
+ {
+ Offsets missing =
Offsets.Immutable.difference(witnessedOffsets.get(remoteNodeId), localOffsets);
+ if (!missing.isEmpty())
+ into.add(remoteNodeId, missing);
});
}
finally
@@ -244,18 +301,12 @@ public abstract class CoordinatorLog
}
}
- protected Offsets.Mutable get(int hostId)
- {
- return witnessedOffsets[participants.indexOf(hostId)];
- }
-
boolean isDurablyReconciled(CoordinatorLogOffsets<?> logOffsets)
{
lock.readLock().lock();
try
{
- // TODO: reconciledOffsets not necessarily durable, update once
durability is implemented
- Offsets.RangeIterator durablyReconciled =
reconciledOffsets.rangeIterator();
+ Offsets.RangeIterator durablyReconciled =
reconciledPersistedOffsets.rangeIterator();
Offsets.RangeIterator difference =
Offsets.difference(logOffsets.offsets(logId.asLong()).rangeIterator(),
durablyReconciled);
return !difference.tryAdvance();
}
@@ -265,17 +316,12 @@ public abstract class CoordinatorLog
}
}
- protected Offsets.Mutable getLocal()
- {
- return witnessedOffsets[participants.indexOf(localHostId)];
- }
-
@Override
public String toString()
{
return "CoordinatorLog{" +
"logId=" + logId +
- ", localHostId=" + localHostId +
+ ", localNodeId=" + localNodeId +
", participants=" + participants +
'}';
}
@@ -284,24 +330,31 @@ public abstract class CoordinatorLog
{
private final AtomicLong sequenceId = new AtomicLong(-1);
- CoordinatorLogPrimary(int localHostId, CoordinatorLogId logId,
Participants participants)
+ CoordinatorLogPrimary(
+ String keyspace, Range<Token> range, int localNodeId,
CoordinatorLogId logId, Participants participants,
+ Node2OffsetsMap witnessedOffsets, Node2OffsetsMap
persistedOffsets, UnreconciledMutations unreconciledMutations)
{
- super(localHostId, logId, participants);
+ super(keyspace, range, localNodeId, logId, participants,
witnessedOffsets, persistedOffsets, unreconciledMutations);
+ }
+
+ CoordinatorLogPrimary(String keyspace, Range<Token> range, int
localNodeId, CoordinatorLogId logId, Participants participants)
+ {
+ super(keyspace, range, localNodeId, logId, participants);
}
@Override
- void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+ void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
{
Preconditions.checkArgument(!mutationId.isNone());
- Preconditions.checkArgument(!Objects.equals(fromHostId,
ClusterMetadata.current().myNodeId().id()));
- logger.trace("witnessed remote mutation {} from {}", mutationId,
fromHostId);
+ Preconditions.checkArgument(!Objects.equals(fromNodeId,
ClusterMetadata.current().myNodeId().id()));
+ logger.trace("witnessed remote mutation {} from {}", mutationId,
fromNodeId);
lock.writeLock().lock();
try
{
- if (!get(fromHostId).add(mutationId.offset()))
+ if (!witnessedOffsets.get(fromNodeId).add(mutationId.offset()))
return; // already witnessed; very uncommon but possible
path
- if (!getLocal().contains(mutationId.offset()))
+ if
(!witnessedOffsets.get(localNodeId).contains(mutationId.offset()))
return; // local host hasn't witnessed yet -> no cleanup
needed
if (remoteReplicasWitnessed(mutationId.offset()))
@@ -318,9 +371,13 @@ public abstract class CoordinatorLog
}
}
+ @Nullable
MutationId nextId()
{
- return new MutationId(logId.asLong(), nextSequenceId());
+ long nextSequenceId = nextSequenceId();
+ return nextSequenceId >= 0
+ ? new MutationId(logId.asLong(), nextSequenceId)
+ : null;
}
private long nextSequenceId()
@@ -331,6 +388,10 @@ public abstract class CoordinatorLog
int prevOffset = MutationId.offset(prev);
int prevTimestamp = MutationId.timestamp(prev);
+ // int overflow
+ if (prevOffset == MAX_OFFSET)
+ return -1;
+
int nextOffset = prevOffset + 1;
int nextTimestamp = Math.max(prevTimestamp + 1, (int)
(currentTimeMillis() / 1000L));
long next = MutationId.sequenceId(nextOffset, nextTimestamp);
@@ -343,16 +404,131 @@ public abstract class CoordinatorLog
static class CoordinatorLogReplica extends CoordinatorLog
{
- CoordinatorLogReplica(int localHostId, CoordinatorLogId logId,
Participants participants)
+ CoordinatorLogReplica(
+ String keyspace, Range<Token> range, int localNodeId,
CoordinatorLogId logId, Participants participants,
+ Node2OffsetsMap witnessedOffsets, Node2OffsetsMap
persistedOffsets, UnreconciledMutations unreconciledMutations)
{
- super(localHostId, logId, participants);
+ super(keyspace, range, localNodeId, logId, participants,
witnessedOffsets, persistedOffsets, unreconciledMutations);
+ }
+
+ CoordinatorLogReplica(String keyspace, Range<Token> range, int
localNodeId, CoordinatorLogId logId, Participants participants)
+ {
+ super(keyspace, range, localNodeId, logId, participants);
}
@Override
- void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+ void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
{
// no-op
}
}
+ /*
+ * Persist to / load from system table.
+ */
+
+ private static final String INSERT_QUERY =
+ format("INSERT INTO %s.%s (keyspace_name, range_start, range_end,
host_id, host_log_id, participants, witnessed_offsets, persisted_offsets) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.COORDINATOR_LOGS);
+
+ void persistToSystemTable()
+ {
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+ lock.readLock().lock();
+ try
+ {
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ executeInternal(INSERT_QUERY, keyspace, range.left.toString(),
range.right.toString(), logId.hostId,
+ logId.hostLogId, participants.asSet(), witnessed,
persisted);
+ }
+
+ void updateLogsInSystemTable()
+ {
+ Offsets.Mutable localWitnessed;
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+ lock.readLock().lock();
+ try
+ {
+ localWitnessed =
Offsets.Mutable.copy(witnessedOffsets.get(localNodeId));
+
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
+
+ persisted.put(localNodeId, witnessed.get(localNodeId));
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+ executeInternal(INSERT_QUERY, keyspace, range.left.toString(),
range.right.toString(), logId.hostId,
+ logId.hostLogId, participants.asSet(), witnessed,
persisted);
+
+ lock.writeLock().lock();
+ try
+ {
+ persistedOffsets.set(localNodeId, localWitnessed);
+ reconciledPersistedOffsets.addAll(persistedOffsets.intersection());
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private static final String SELECT_QUERY =
+ format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND range_start =
? AND range_end = ?",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.COORDINATOR_LOGS);
+
+ static List<CoordinatorLog> loadFromSystemTable(String keyspace,
Range<Token> range, int localNodeId)
+ {
+ ArrayList<CoordinatorLog> logs = new ArrayList<>();
+ for (UntypedResultSet.Row row : executeInternal(SELECT_QUERY,
keyspace, range.left.toString(), range.right.toString()))
+ {
+ int nodeId = row.getInt("host_id");
+ int hostLogId = row.getInt("host_log_id");
+ CoordinatorLogId logId = new CoordinatorLogId(nodeId, hostLogId);
+ Set<Integer> participants = row.getFrozenSet("participants",
Int32Type.instance);
+ Map<Integer, List<Integer>> witnessedOffsets =
+ row.getMap("witnessed_offsets", Int32Type.instance,
ListType.getInstance(Int32Type.instance, false));
+ Map<Integer, List<Integer>> persistedOffsets =
+ row.getMap("persisted_offsets", Int32Type.instance,
ListType.getInstance(Int32Type.instance, false));
+ Node2OffsetsMap witnessed = fromPrimitiveMap(logId,
witnessedOffsets);
+ Node2OffsetsMap persisted = fromPrimitiveMap(logId,
persistedOffsets);
+ UnreconciledMutations unreconciled =
UnreconciledMutations.loadFromJournal(witnessed, localNodeId);
+ CoordinatorLog log =
+ CoordinatorLog.recreate(keyspace, range, localNodeId, logId,
new Participants(participants), witnessed, persisted, unreconciled);
+ logs.add(log);
+ }
+ return logs;
+ }
+
+ private static final String DELETE_QUERY =
+ format("DELETE FROM %s.%s WHERE keyspace_name = ? AND range_start = ?
AND range_end = ? AND host_id = ? AND host_log_id = ?",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.COORDINATOR_LOGS);
+
+ void deleteFromSystemTable()
+ {
+ executeInternal(DELETE_QUERY, keyspace, range.left.toString(),
range.right.toString(), logId.hostId, logId.hostLogId);
+ }
+
+ @VisibleForTesting
+ static void overrideMaxOffsetForTesting(int nexMaxOffset)
+ {
+ MAX_OFFSET = nexMaxOffset;
+ }
+ // don't make volatile unless it genuinely is an issue for some test,
+ // otherwise it should be *fine* as is, and slight overkill to make
volatile
+ private static int MAX_OFFSET = Integer.MAX_VALUE;
}
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 7ec6758707..f780a35bfa 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.replication;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -26,10 +27,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -40,10 +40,11 @@ import com.google.common.base.Preconditions;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.IntHashSet;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
-import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Splitter;
@@ -55,10 +56,13 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.reads.tracked.TrackedLocalReads;
import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.utils.FBUtilities;
@@ -66,14 +70,17 @@ import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.lang.String.format;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static
org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
// TODO (expected): persistence (handle restarts)
// TODO (expected): handle topology changes
public class MutationTrackingService
{
+ private static final ScheduledExecutorPlus executor =
executorFactory().scheduled("Mutation-Tracking-Service", NORMAL);
+
/**
* Split ranges into this many shards.
*
@@ -89,6 +96,7 @@ public class MutationTrackingService
private final ConcurrentHashMap<CoordinatorLogId, Shard> log2ShardMap =
new ConcurrentHashMap<>();
private final ReplicatedOffsetsBroadcaster offsetsBroadcaster = new
ReplicatedOffsetsBroadcaster();
+ private final LogStatePersister offsetsPersister = new LogStatePersister();
private final ActiveLogReconciler activeReconciler = new
ActiveLogReconciler();
private final IncomingMutations incomingMutations = new
IncomingMutations();
@@ -104,13 +112,16 @@ public class MutationTrackingService
if (started)
return;
- logger.info("Starting replication tracking service");
+ prevHostLogId = loadHostLogIdFromSystemTable();
+
+ logger.info("Starting mutation tracking service. Previous host log id:
{}", prevHostLogId);
- for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
- if (keyspace.useMutationTracking())
- keyspaceShards.put(keyspace.name,
KeyspaceShards.make(keyspace, metadata, this::nextHostLogId, this::onNewLog));
+ if (metadata.myNodeId() != null)
+ for (KeyspaceShards ks :
KeyspaceShards.loadFromSystemTables(metadata, this::nextLogId, this::onNewLog))
+ keyspaceShards.put(ks.keyspace, ks);
offsetsBroadcaster.start();
+ offsetsPersister.start();
ExpiredStatePurger.instance.register(incomingMutations);
@@ -124,9 +135,10 @@ public class MutationTrackingService
public void shutdownBlocking() throws InterruptedException
{
- offsetsBroadcaster.shutdown();
- offsetsBroadcaster.awaitTermination(1, TimeUnit.MINUTES);
+ // TODO: FIXME
activeReconciler.shutdownBlocking();
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
ExpiredStatePurger.instance.shutdownBlocking();
}
@@ -165,9 +177,9 @@ public class MutationTrackingService
activeReconciler.schedule(mutationId, onHost,
ActiveLogReconciler.Priority.REGULAR);
}
- public void updateReplicatedOffsets(String keyspace, Range<Token> range,
List<? extends Offsets> offsets, InetAddressAndPort onHost)
+ public void updateReplicatedOffsets(String keyspace, Range<Token> range,
List<? extends Offsets> offsets, boolean durable, InetAddressAndPort onHost)
{
- getOrCreateShards(keyspace).updateReplicatedOffsets(range, offsets,
onHost);
+ getOrCreateShards(keyspace).updateReplicatedOffsets(range, offsets,
durable, onHost);
}
public boolean startWriting(Mutation mutation)
@@ -264,41 +276,41 @@ public class MutationTrackingService
ClusterMetadata csm = ClusterMetadata.current();
KeyspaceMetadata ksm = csm.schema.getKeyspaceMetadata(keyspace);
- return keyspaceShards.computeIfAbsent(keyspace, ignore ->
KeyspaceShards.make(ksm, csm, this::nextHostLogId, this::onNewLog));
+ return keyspaceShards.computeIfAbsent(keyspace, ignore ->
KeyspaceShards.make(ksm, csm, this::nextLogId, this::onNewLog));
+ }
+
+ private long nextLogId()
+ {
+ NodeId nodeId = ClusterMetadata.current().myNodeId();
+ Preconditions.checkNotNull(nodeId);
+ return CoordinatorLogId.asLong(nodeId.id(), nextHostLogId());
}
- // TODO (expected): durability
- int nextHostLogId()
+ /*
+ * Allocate and persist the next host log id.
+ * We only do this on startup and when rotating logs.
+ */
+ private int nextHostLogId()
{
- return nextHostLogId.incrementAndGet();
+ int nextHostLogId = ++prevHostLogId;
+ persistHostLogIdToSystemTable(nextHostLogId);
+ return nextHostLogId;
}
- private final AtomicInteger nextHostLogId = new AtomicInteger();
+ private int prevHostLogId;
- public boolean isDurablyReconciled(String keyspace,
ImmutableCoordinatorLogOffsets logOffsets)
+ public boolean isDurablyReconciled(ImmutableCoordinatorLogOffsets
logOffsets)
{
// Could pass through SSTable bounds to exclude shards for
non-overlapping ranges, but this will mostly be
// called on flush for L0 SSTables with wide bounds.
-
- KeyspaceShards shards = keyspaceShards.get(keyspace);
- if (shards == null)
- {
- logger.debug("Could not find shards for keyspace {}", keyspace);
- return false;
- }
-
for (Long logId : logOffsets)
{
- CoordinatorLogId coordinatorLogId = new CoordinatorLogId(logId);
- CoordinatorLog log = shards.logs.get(coordinatorLogId);
- if (log == null)
- {
- logger.warn("Could not determine lifecycle for unknown logId
{}, not marking as durably reconciled", coordinatorLogId);
- return false;
- }
- if (!log.isDurablyReconciled(logOffsets))
+ Shard shard = getShardNullable(new CoordinatorLogId(logId));
+ if (shard == null)
+ throw new IllegalStateException("Could not find shard for
logId " + logId);
+
+ if (!shard.isDurablyReconciled(logId, logOffsets))
return false;
}
-
return true;
}
@@ -308,17 +320,15 @@ public class MutationTrackingService
log2ShardMap.put(log.logId, shard);
}
- private static class KeyspaceShards implements Shard.Subscriber
+ private static class KeyspaceShards
{
private final String keyspace;
private final Map<Range<Token>, Shard> shards;
private final ReplicaGroups groups;
- private final BiConsumer<Shard, CoordinatorLog> onNewLog;
private transient final Map<Range<PartitionPosition>, Shard> ppShards;
- private transient final Map<CoordinatorLogId, CoordinatorLog> logs;
- static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata
cluster, IntSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
+ static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata
cluster, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
{
Preconditions.checkArgument(keyspace.params.replicationType.isTracked());
@@ -341,26 +351,23 @@ public class MutationTrackingService
for (Range<Token> tokenRange : ranges)
{
- shards.put(tokenRange, new Shard(keyspace.name,
tokenRange, cluster.myNodeId().id(), participants, forRange.lastModified(),
logIdProvider, onNewLog));
+ shards.put(tokenRange, new Shard(cluster.myNodeId().id(),
keyspace.name, tokenRange, participants, logIdProvider, onNewLog));
groups.put(tokenRange, forRange.map(original ->
original.withRange(tokenRange)));
}
});
- return new KeyspaceShards(keyspace.name, shards, new
ReplicaGroups(groups), onNewLog);
+ KeyspaceShards keyspaceShards = new KeyspaceShards(keyspace.name,
shards, new ReplicaGroups(groups));
+ keyspaceShards.persistToSystemTables();
+ return keyspaceShards;
}
- KeyspaceShards(String keyspace, Map<Range<Token>, Shard> shards,
ReplicaGroups groups, BiConsumer<Shard, CoordinatorLog> onNewLog)
+ KeyspaceShards(String keyspace, Map<Range<Token>, Shard> shards,
ReplicaGroups groups)
{
this.keyspace = keyspace;
this.shards = shards;
this.groups = groups;
- this.onNewLog = onNewLog;
- this.logs = new HashMap<>();
HashMap<Range<PartitionPosition>, Shard> ppShards = new
HashMap<>();
- shards.forEach((range, shard) -> {
- ppShards.put(Range.makeRowRange(range), shard);
- shard.addSubscriber(this);
- });
+ shards.forEach((range, shard) ->
ppShards.put(Range.makeRowRange(range), shard));
this.ppShards = ppShards;
}
@@ -369,9 +376,9 @@ public class MutationTrackingService
return lookUp(token).nextId();
}
- void updateReplicatedOffsets(Range<Token> range, List<? extends
Offsets> offsets, InetAddressAndPort onHost)
+ void updateReplicatedOffsets(Range<Token> range, List<? extends
Offsets> offsets, boolean durable, InetAddressAndPort onHost)
{
- shards.get(range).updateReplicatedOffsets(offsets, onHost);
+ shards.get(range).updateReplicatedOffsets(offsets, durable,
onHost);
}
boolean startWriting(Mutation mutation)
@@ -430,66 +437,105 @@ public class MutationTrackingService
return shards.get(groups.forRange(token).range());
}
- @Override
- public void onLogCreation(CoordinatorLog log)
+ void persistToSystemTables()
{
- logger.debug("Indexing created log {}", log);
- logs.put(log.logId, log);
+ for (Shard shard : shards.values()) shard.persistToSystemTables();
}
- @Override
- public void onSubscribe(CoordinatorLog currentLog)
+ static List<KeyspaceShards> loadFromSystemTables(ClusterMetadata
cluster, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
{
- logger.debug("Indexing current log {}", currentLog);
- logs.put(currentLog.logId, currentLog);
+ Map<String, Map<Range<Token>, Shard>> groupedShards = new
HashMap<>();
+ for (Shard shard :
Shard.loadFromSystemTables(cluster.myNodeId().id(), logIdProvider, onNewLog))
+ groupedShards.computeIfAbsent(shard.keyspace, k -> new
HashMap<>()).put(shard.range, shard);
+ List<KeyspaceShards> keyspaceShards = new ArrayList<>();
+ for (Map.Entry<String, Map<Range<Token>, Shard>> entry :
groupedShards.entrySet())
+ {
+ ReplicationParams params =
cluster.schema.getKeyspaceMetadata(entry.getKey()).params.replication;
+ ReplicaGroups originalGroups =
cluster.placements.get(params).writes; // prior to splitting
+
+ Map<Range<Token>, VersionedEndpoints.ForRange> splitGroups =
new HashMap<>();
+ for (Range<Token> splitRange : entry.getValue().keySet())
+ splitGroups.put(splitRange,
originalGroups.matchRange(splitRange));
+
+ keyspaceShards.add(new KeyspaceShards(entry.getKey(),
entry.getValue(), new ReplicaGroups(splitGroups)));
+ }
+ return keyspaceShards;
}
}
- // TODO (later): a more intelligent heuristic for offsets included in
broadcasts
- private static class ReplicatedOffsetsBroadcaster implements Runnable,
Shutdownable
+ private static final String HOST_LOG_ID_KEY = "local";
+
+ private static final String INSERT_QUERY =
+ format("INSERT INTO %s.%s (key, host_log_id) VALUES (?, ?)",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.HOST_LOG_ID);
+
+ static void persistHostLogIdToSystemTable(int hostLogId)
+ {
+ executeInternal(INSERT_QUERY, HOST_LOG_ID_KEY, hostLogId);
+ }
+
+ private static final String SELECT_QUERY =
+ format("SELECT * FROM %s.%s WHERE key = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.HOST_LOG_ID);
+
+ static int loadHostLogIdFromSystemTable()
{
- private static final ScheduledExecutorPlus executor =
- executorFactory().scheduled("Replicated-Offsets-Broadcaster",
NORMAL);
+ UntypedResultSet rows = executeInternal(SELECT_QUERY, HOST_LOG_ID_KEY);
+ if (rows.isEmpty())
+ return 0;
+ return rows.one().getInt("host_log_id");
+ }
+ // TODO (later): a more intelligent heuristic for offsets included in
broadcasts
+ private static class ReplicatedOffsetsBroadcaster
+ {
// TODO (later): a more intelligent heuristic for scheduling broadcasts
- private static final long BROADCAST_INTERVAL_MILLIS = 200;
+ private static final long TRANSIENT_BROADCAST_INTERVAL_MILLIS = 200;
+ private static final long DURABLE_BROADCAST_INTERVAL_MILLIS = 60_000;
void start()
{
- executor.scheduleWithFixedDelay(this, BROADCAST_INTERVAL_MILLIS,
BROADCAST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ executor.scheduleWithFixedDelay(() -> run(false),
+
TRANSIENT_BROADCAST_INTERVAL_MILLIS,
+
TRANSIENT_BROADCAST_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
+ executor.scheduleWithFixedDelay(() -> run(true),
+ DURABLE_BROADCAST_INTERVAL_MILLIS,
+ DURABLE_BROADCAST_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
}
- @Override
- public boolean isTerminated()
+ public void run(boolean durable)
{
- return executor.isTerminated();
+ MutationTrackingService.instance.forEachKeyspace(ks -> run(ks,
durable));
}
- @Override
- public void shutdown()
+ private void run(KeyspaceShards shards, boolean durable)
{
- executor.shutdown();
+ shards.forEachShard(sh -> run(sh, durable));
}
- @Override
- public Object shutdownNow()
+ private void run(Shard shard, boolean durable)
{
- return executor.shutdownNow();
- }
+ BroadcastLogOffsets replicatedOffsets =
shard.collectReplicatedOffsets(durable);
+ if (replicatedOffsets.isEmpty())
+ return;
- @Override
- public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
- {
- return executor.awaitTermination(timeout, units);
+ Message<BroadcastLogOffsets> message =
Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets);
+
+ for (InetAddressAndPort target : shard.remoteReplicas())
+ if (FailureDetector.instance.isAlive(target))
+ MessagingService.instance().send(message, target);
}
+ }
+
+ private static class LogStatePersister implements Runnable
+ {
+ // TODO (expected): consider a different interval
+ private static final long PERSIST_INTERVAL_MINUTES = 1;
- public void shutdownBlocking() throws InterruptedException
+ void start()
{
- if (!executor.isTerminated())
- {
- executor.shutdown();
- executor.awaitTermination(1, MINUTES);
- }
+ executor.scheduleWithFixedDelay(this, PERSIST_INTERVAL_MINUTES,
PERSIST_INTERVAL_MINUTES, TimeUnit.MINUTES);
}
@Override
@@ -505,22 +551,15 @@ public class MutationTrackingService
private void run(Shard shard)
{
- BroadcastLogOffsets replicatedOffsets =
shard.collectReplicatedOffsets();
- if (replicatedOffsets.isEmpty())
- return;
-
- Message<BroadcastLogOffsets> message =
Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets);
-
- for (InetAddressAndPort target : shard.remoteReplicas())
- if (FailureDetector.instance.isAlive(target))
- MessagingService.instance().send(message, target);
+ shard.updateLogsInSystemTable();
}
}
@VisibleForTesting
public void broadcastOffsetsForTesting()
{
- offsetsBroadcaster.run();
+ offsetsBroadcaster.run(false);
+ offsetsBroadcaster.run(true);
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
index 12fa18c98a..1001d855cd 100644
--- a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
+++ b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
@@ -17,6 +17,14 @@
*/
package org.apache.cassandra.replication;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Preconditions;
+
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntObjConsumer;
@@ -25,24 +33,93 @@ import org.agrona.collections.IntObjConsumer;
*/
public class Node2OffsetsMap
{
- private final Int2ObjectHashMap<Offsets> map = new Int2ObjectHashMap<>();
+ private final Int2ObjectHashMap<Offsets.Mutable> offsetsMap;
+
+ public Node2OffsetsMap()
+ {
+ offsetsMap = new Int2ObjectHashMap<>(8, 0.65f, false);
+ }
+
+ public static Node2OffsetsMap forParticipants(CoordinatorLogId logId,
Participants participants)
+ {
+ Node2OffsetsMap map = new Node2OffsetsMap();
+ for (int i = 0; i < participants.size(); i++)
+ map.set(participants.get(i), new Offsets.Mutable(logId));
+ return map;
+ }
+
+ void set(int node, Offsets.Mutable offsets)
+ {
+ offsetsMap.put(node, offsets);
+ }
+
+ @Nonnull
+ Offsets.Mutable get(int node)
+ {
+ return Preconditions.checkNotNull(offsetsMap.get(node));
+ }
+
+ Offsets.Mutable intersection()
+ {
+ Iterator<Offsets.Mutable> iter = offsetsMap.values().iterator();
+
+ Preconditions.checkArgument(iter.hasNext());
+ if (offsetsMap.size() == 1)
+ return Offsets.Mutable.copy(iter.next());
+
+ Offsets.Mutable intersection = iter.next();
+ while (iter.hasNext())
+ intersection = Offsets.Mutable.intersection(intersection,
iter.next());
+ return intersection;
+ }
public void add(int node, Offsets offsets)
{
- Offsets.Mutable current = (Offsets.Mutable) map.get(node);
+ Offsets.Mutable current = offsetsMap.get(node);
if (current != null)
current.addAll(offsets);
else
- map.put(node, Offsets.Mutable.copy(offsets));
+ offsetsMap.put(node, Offsets.Mutable.copy(offsets));
}
- public void forEach(IntObjConsumer<Offsets> consumer)
+ public void forEach(IntObjConsumer<Offsets.Mutable> consumer)
{
- map.forEachInt(consumer);
+ offsetsMap.forEachInt(consumer);
}
public void clear()
{
- map.clear();
+ offsetsMap.clear();
+ }
+
+ public int size()
+ {
+ return offsetsMap.size();
+ }
+
+ void convertToPrimitiveMap(Map<Integer, List<Integer>> into)
+ {
+ for (Int2ObjectHashMap<Offsets.Mutable>.EntryIterator iter =
offsetsMap.entrySet().iterator(); iter.hasNext();)
+ {
+ iter.next();
+ into.put(iter.getIntKey(), iter.getValue().asList());
+ }
+ }
+
+ static Node2OffsetsMap fromPrimitiveMap(CoordinatorLogId logId,
Map<Integer, List<Integer>> from)
+ {
+ Node2OffsetsMap map = new Node2OffsetsMap();
+ for (Map.Entry<Integer, List<Integer>> entry : from.entrySet())
+ map.set(entry.getKey(), Offsets.fromList(logId, entry.getValue()));
+ return map;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof Node2OffsetsMap))
+ return false;
+ Node2OffsetsMap that = (Node2OffsetsMap) o;
+ return this.offsetsMap.equals(that.offsetsMap);
}
}
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java
b/src/java/org/apache/cassandra/replication/Offsets.java
index fc080452a7..dfb4f3f404 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import org.agrona.collections.IntArrayList;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -34,6 +35,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
public abstract class Offsets implements Iterable<ShortMutationId>
{
@@ -200,6 +202,19 @@ public abstract class Offsets implements
Iterable<ShortMutationId>
digest.updateWithInt(bounds[i]);
}
+ List<Integer> asList()
+ {
+ return new IntArrayList(Arrays.copyOf(bounds, size), size,
IntArrayList.DEFAULT_NULL_VALUE);
+ }
+
+ static Mutable fromList(CoordinatorLogId logId, List<Integer> list)
+ {
+ int[] bounds = new int[list.size()];
+ for (int i = 0; i < list.size(); i++)
+ bounds[i] = list.get(i);
+ return new Mutable(logId, bounds, bounds.length);
+ }
+
static abstract class AbstractMutable<T extends AbstractMutable<T>>
extends Offsets implements OffsetReciever
{
AbstractMutable(RangeIterator rangeIterator)
@@ -380,6 +395,11 @@ public abstract class Offsets implements
Iterable<ShortMutationId>
// the offset is before all existing ranges, in-between two, or
after all exsiting ranges
}
+ public void add(int... offsets)
+ {
+ for (int offset : offsets) add(offset);
+ }
+
private enum AddAction
{
INSERT, MOVE, INCLUDE;
@@ -673,6 +693,19 @@ public abstract class Offsets implements
Iterable<ShortMutationId>
{
return Offsets.intersection(a, b, adaptor);
}
+
+ public static Mutable intersection(Offsets.Mutable... offsets)
+ {
+ Preconditions.checkArgument(offsets.length > 0);
+
+ if (offsets.length == 1)
+ return copy(offsets[0]);
+
+ Mutable intersection = offsets[0];
+ for (int i = 1; i < offsets.length; i++)
+ intersection = intersection(intersection, offsets[i]);
+ return intersection;
+ }
}
public static class Immutable extends Offsets
diff --git a/src/java/org/apache/cassandra/replication/Participants.java
b/src/java/org/apache/cassandra/replication/Participants.java
index f245797586..89494f240c 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/Participants.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.replication;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Set;
+
+import org.agrona.collections.IntHashSet;
public class Participants
{
@@ -63,4 +66,21 @@ public class Participants
{
return Arrays.toString(hosts);
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof Participants))
+ return false;
+ Participants that = (Participants) o;
+ return Arrays.equals(this.hosts, that.hosts);
+ }
+
+ Set<Integer> asSet()
+ {
+ IntHashSet set = new IntHashSet(hosts.length);
+ for (int host : hosts)
+ set.add(host);
+ return set;
+ }
}
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 6588bede09..6f198e686d 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -18,72 +18,100 @@
package org.apache.cassandra.replication;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
import org.jctools.maps.NonBlockingHashMapLong;
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
public class Shard
{
- private final String keyspace;
- private final Range<Token> tokenRange;
- private final int localHostId;
- private final Participants participants;
- private final Epoch sinceEpoch;
- private final BiConsumer<Shard, CoordinatorLog> onNewLog;
- private final NonBlockingHashMapLong<CoordinatorLog> logs;
- // TODO (expected): add support for log rotation
- private final CoordinatorLog.CoordinatorLogPrimary currentLocalLog;
+ private static final Logger logger = LoggerFactory.getLogger(Shard.class);
- private final List<Subscriber> subscribers = new ArrayList<>();
+ final int localNodeId;
+ final String keyspace;
+ final Range<Token> range;
+ final Participants participants;
- public interface Subscriber
- {
- default void onLogCreation(CoordinatorLog log) {}
- default void onSubscribe(CoordinatorLog currentLog) {}
- }
+ private final LongSupplier logIdProvider;
+ private final BiConsumer<Shard, CoordinatorLog> onNewLog;
+ private final NonBlockingHashMapLong<CoordinatorLog> logs;
+ private volatile CoordinatorLogPrimary currentLocalLog;
- Shard(String keyspace,
- Range<Token> tokenRange,
- int localHostId,
+ Shard(int localNodeId,
+ String keyspace,
+ Range<Token> range,
Participants participants,
- Epoch sinceEpoch,
- IntSupplier logIdProvider,
+ List<CoordinatorLog> logs,
+ LongSupplier logIdProvider,
BiConsumer<Shard, CoordinatorLog> onNewLog)
{
- Preconditions.checkArgument(participants.contains(localHostId));
+ Preconditions.checkArgument(participants.contains(localNodeId));
+ this.localNodeId = localNodeId;
this.keyspace = keyspace;
- this.tokenRange = tokenRange;
- this.localHostId = localHostId;
+ this.range = range;
this.participants = participants;
- this.sinceEpoch = sinceEpoch;
+ this.logIdProvider = logIdProvider;
this.logs = new NonBlockingHashMapLong<>();
this.onNewLog = onNewLog;
- this.currentLocalLog = startNewLog(localHostId,
logIdProvider.getAsInt(), participants);
- CoordinatorLogId logId = currentLocalLog.logId;
- Preconditions.checkArgument(!logId.isNone());
- logs.put(logId.asLong(), currentLocalLog);
+ for (CoordinatorLog log : logs)
+ {
+ this.logs.put(log.logId.asLong(), log);
+ onNewLog.accept(Shard.this, log);
+ }
+ this.currentLocalLog = createNewPrimaryLog();
+ }
+
+ Shard(int localNodeId, String keyspace, Range<Token> range, Participants
participants, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog>
onNewLog)
+ {
+ this(localNodeId, keyspace, range, participants,
Collections.emptyList(), logIdProvider, onNewLog);
}
MutationId nextId()
{
- return currentLocalLog.nextId();
+ MutationId nextId = currentLocalLog.nextId();
+ if (nextId != null)
+ return nextId;
+ return maybeRotateLocalLogAndGetNextId();
+ }
+
+ // if ids overflow, we need to rotate the local log
+ synchronized private MutationId maybeRotateLocalLogAndGetNextId()
+ {
+ MutationId nextId = currentLocalLog.nextId();
+ if (nextId != null) // another thread got to rotate before us
+ return nextId;
+ CoordinatorLogId oldLogId = currentLocalLog.logId;
+ currentLocalLog = createNewPrimaryLog();
+ logger.info("Rotated primary log for {}/{} from {} to {}", keyspace,
range, oldLogId, currentLocalLog.logId);
+ return nextId();
}
void receivedWriteResponse(ShortMutationId mutationId, InetAddressAndPort
fromHost)
@@ -92,11 +120,11 @@ public class Shard
getOrCreate(mutationId).receivedWriteResponse(mutationId, fromHostId);
}
- void updateReplicatedOffsets(List<? extends Offsets> offsets,
InetAddressAndPort onHost)
+ void updateReplicatedOffsets(List<? extends Offsets> offsets, boolean
durable, InetAddressAndPort onHost)
{
int onHostId = ClusterMetadata.current().directory.peerId(onHost).id();
for (Offsets logOffsets : offsets)
-
getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, onHostId);
+
getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, durable,
onHostId);
}
boolean startWriting(Mutation mutation)
@@ -143,39 +171,31 @@ public class Shard
for (int i = 0, size = participants.size(); i < size; ++i)
{
int hostId = participants.get(i);
- if (hostId != localHostId)
+ if (hostId != localNodeId)
replicas.add(ClusterMetadata.current().directory.endpoint(new
NodeId(hostId)));
}
return replicas;
}
+ boolean isDurablyReconciled(long logId, CoordinatorLogOffsets<?>
logOffsets)
+ {
+ return logs.get(logId).isDurablyReconciled(logOffsets);
+ }
+
/**
* Collects replicated offsets for the logs owned by this coordinator on
this shard.
*/
- BroadcastLogOffsets collectReplicatedOffsets()
+ BroadcastLogOffsets collectReplicatedOffsets(boolean durable)
{
List<Offsets.Immutable> offsets = new ArrayList<>();
for (CoordinatorLog log : logs.values())
{
- Offsets.Immutable logOffsets = log.collectReplicatedOffsets();
+ Offsets.Immutable logOffsets =
log.collectReplicatedOffsets(durable);
if (logOffsets != null)
offsets.add(logOffsets);
}
- return new BroadcastLogOffsets(keyspace, tokenRange, offsets);
- }
-
- /**
- * Creates a new coordinator log for this host. Primarily on Shard init
(node startup or topology change).
- * Also on keyspace creation.
- */
- private CoordinatorLog.CoordinatorLogPrimary startNewLog(int localHostId,
int hostLogId, Participants participants)
- {
- CoordinatorLogId logId = new CoordinatorLogId(localHostId, hostLogId);
- CoordinatorLog.CoordinatorLogPrimary log =
- new CoordinatorLog.CoordinatorLogPrimary(localHostId, logId,
participants);
- onNewLog.accept(this, log);
- return log;
+ return new BroadcastLogOffsets(keyspace, range, offsets, durable);
}
private CoordinatorLog getOrCreate(Mutation mutation)
@@ -203,27 +223,63 @@ public class Shard
private CoordinatorLog getOrCreate(long logId)
{
CoordinatorLog log = logs.get(logId);
- if (log != null)
- return log;
- CoordinatorLog newLog = logs.computeIfAbsent(logId, ignore ->
CoordinatorLog.create(localHostId, new CoordinatorLogId(logId), participants));
- onNewLog.accept(this, newLog);
- for (Subscriber subscriber : subscribers)
- subscriber.onLogCreation(newLog);
- return newLog;
+ return log != null ? log : createNewLog(logId);
}
- public void addSubscriber(Subscriber subscriber)
+ /**
+ * Creates a new coordinator log for this host. Primarily on Shard init
(node startup or topology change) and on keyspace creation.
+ */
+ private CoordinatorLog createNewLog(long logId)
{
- subscriber.onSubscribe(currentLocalLog);
- subscribers.add(subscriber);
+ CoordinatorLog next = CoordinatorLog.create(keyspace, range,
localNodeId, new CoordinatorLogId(logId), participants);
+ CoordinatorLog prev = logs.putIfAbsent(logId, next);
+ if (null == prev) onNewLog.accept(this, next);
+ return null != prev ? prev : next;
}
- private CoordinatorLog create(long logId)
+ private CoordinatorLogPrimary createNewPrimaryLog()
{
- CoordinatorLog log = CoordinatorLog.create(localHostId, new
CoordinatorLogId(logId), participants);
- onNewLog.accept(this, log);
- for (Subscriber subscriber : subscribers)
- subscriber.onLogCreation(log);
- return log;
+ return (CoordinatorLogPrimary) createNewLog(logIdProvider.getAsLong());
+ }
+
+ /*
+ * Persist to / load from system table.
+ */
+
+ private static final String INSERT_QUERY =
+ format("INSERT INTO %s.%s (keyspace_name, range_start, range_end,
participants) VALUES (?, ?, ?, ?)",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.SHARDS);
+
+ void persistToSystemTables()
+ {
+ executeInternal(INSERT_QUERY, keyspace, range.left.toString(),
range.right.toString(), participants.asSet());
+ for (CoordinatorLog log : logs.values())
+ log.persistToSystemTable();
+ }
+
+ void updateLogsInSystemTable()
+ {
+ for (CoordinatorLog log : logs.values())
+ log.updateLogsInSystemTable();
+ }
+
+ private static final String SELECT_QUERY =
+ format("SELECT * FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.SHARDS);
+
+ static ArrayList<Shard> loadFromSystemTables(int localNodeId, LongSupplier
logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
+ {
+ Token.TokenFactory factory =
ClusterMetadata.current().partitioner.getTokenFactory();
+ ArrayList<Shard> shards = new ArrayList<>();
+ for (UntypedResultSet.Row row : executeInternal(SELECT_QUERY))
+ {
+ String keyspace = row.getString("keyspace_name");
+ String rangeStart = row.getString("range_start");
+ String rangeEnd = row.getString("range_end");
+ Range<Token> range = new Range<>(factory.fromString(rangeStart),
factory.fromString(rangeEnd));
+ Set<Integer> participants = row.getFrozenSet("participants",
Int32Type.instance);
+ List<CoordinatorLog> logs =
CoordinatorLog.loadFromSystemTable(keyspace, range, localNodeId);
+ shards.add(new Shard(localNodeId, keyspace, range, new
Participants(participants), logs, logIdProvider, onNewLog));
+ }
+ return shards;
}
}
diff --git
a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index 5a6e9d1f55..63dff77349 100644
--- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
@@ -197,4 +198,41 @@ public class UnreconciledMutations
return collect(statesSet.subSet(start, end), tableId,
includePending, into);
}
}
+
+ @VisibleForTesting
+ boolean equalsForTesting(UnreconciledMutations other)
+ {
+ return this.statesMap.equals(other.statesMap) &&
this.statesSet.equals(other.statesSet);
+ }
+
+ @VisibleForTesting
+ void addDirectly(Mutation mutation)
+ {
+ Entry entry = Entry.create(mutation);
+ entry.visibility = Visibility.VISIBLE;
+ statesMap.put(entry.offset, entry);
+ statesSet.add(entry);
+ }
+
+ static UnreconciledMutations loadFromJournal(Node2OffsetsMap
witnessedOffsets, int localNodeId)
+ {
+ UnreconciledMutations result = new UnreconciledMutations();
+
+ Offsets.Mutable witnessed = witnessedOffsets.get(localNodeId);
+ Offsets.Mutable reconciled = witnessedOffsets.intersection();
+
+ // difference between locally witnessed offsets and fully reconciled
ones is all the ids
+ // that need to be loaded into UnreconciledMutations index
+ Offsets.RangeIterator iter =
Offsets.difference(witnessed.rangeIterator(), reconciled.rangeIterator());
+ while (iter.tryAdvance())
+ {
+ for (int offset = iter.start(), end = iter.end(); offset <= end;
offset++)
+ {
+ ShortMutationId id = new ShortMutationId(witnessed.logId,
offset);
+ result.addDirectly(MutationJournal.instance.read(id));
+ }
+ }
+
+ return result;
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
index c4a4079225..db2a2be94d 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.db.SystemKeyspace.SNAPSHOT_TABLE_NAME;
+import static org.apache.cassandra.db.SystemKeyspace.METADATA_SNAPSHOTS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -183,7 +183,7 @@ public class SystemKeyspaceStorageTest extends
CoordinatorPathTestBase
public static void deleteSnapshot(long epoch)
{
- String query = String.format("DELETE FROM %s.%s WHERE epoch = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+ String query = String.format("DELETE FROM %s.%s WHERE epoch = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
QueryProcessor.executeInternal(query, epoch);
}
}
diff --git a/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
b/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
index 8da29df678..f5789d01f5 100644
--- a/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.utils.TimeUUID;
import static org.apache.cassandra.db.SystemKeyspace.BATCHES;
import static org.apache.cassandra.db.SystemKeyspace.LOCAL;
import static org.apache.cassandra.db.SystemKeyspace.METADATA_LOG;
-import static org.apache.cassandra.db.SystemKeyspace.SNAPSHOT_TABLE_NAME;
+import static org.apache.cassandra.db.SystemKeyspace.METADATA_SNAPSHOTS;
import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
public class SystemKeyspaceQueryTest extends CQLTester
@@ -44,7 +44,7 @@ public class SystemKeyspaceQueryTest extends CQLTester
assertRowCountNet(executeNet(String.format("SELECT * FROM %s.%s WHERE
epoch = 1",
SYSTEM_KEYSPACE_NAME,
METADATA_LOG)), 0);
assertRowCountNet(executeNet(String.format("SELECT * FROM %s.%s WHERE
epoch = 1",
- SYSTEM_KEYSPACE_NAME,
SNAPSHOT_TABLE_NAME)), 0);
+ SYSTEM_KEYSPACE_NAME,
METADATA_SNAPSHOTS)), 0);
// system.batches table uses LocalPartitioner
assertRowCountNet(executeNet(String.format("SELECT * FROM %s.%s WHERE
id = %s",
SYSTEM_KEYSPACE_NAME,
BATCHES,
diff --git
a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
index 91a2788ddf..07f4bc5eaf 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
@@ -280,7 +280,7 @@ public class CoordinatorLogOffsetsTest
ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
.add(mutation.id())
.build();
-
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks,
logOffsets)).isTrue();
+
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isTrue();
}
// Applied locally but not on remote replicas
@@ -292,7 +292,7 @@ public class CoordinatorLogOffsetsTest
ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
.add(mutation.id())
.build();
-
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks,
logOffsets)).isFalse();
+
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse();
}
// Applied on remote replicas but not locally
@@ -306,7 +306,7 @@ public class CoordinatorLogOffsetsTest
ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
.add(mutation.id())
.build();
-
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks,
logOffsets)).isFalse();
+
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse();
}
// If no replicas are aware of a log, it should be considered
unreconciled out of caution
@@ -326,7 +326,7 @@ public class CoordinatorLogOffsetsTest
ImmutableCoordinatorLogOffsets.Builder logOffsetsBuilder = new
ImmutableCoordinatorLogOffsets.Builder();
logOffsetsBuilder.add(fakeMutationId);
-
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks,
logOffsetsBuilder.build())).isFalse();
+
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build())).isFalse();
}
MutationTrackingService.instance.shutdownBlocking();
diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
index f89f1a521b..5122683b41 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -30,6 +29,7 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -38,11 +38,21 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
public class CoordinatorLogTest
{
private static final int LOCAL_HOST_ID = 1;
- private static final CoordinatorLogId LOG_ID = new
CoordinatorLogId(LOCAL_HOST_ID, 1);
- private static final Participants PARTICIPANTS = new
Participants(List.of(LOCAL_HOST_ID, 2, 3));
+ private static final int REMOTE_HOST_ID_1 = 2;
+ private static final int REMOTE_HOST_ID_2 = 3;
+
+ private static final CoordinatorLogId LOCAL_LOG_ID = new
CoordinatorLogId(LOCAL_HOST_ID, 1);
+ private static final CoordinatorLogId REPLICA_LOG_ID = new
CoordinatorLogId(REMOTE_HOST_ID_1, 1);
+
+ private static final Participants PARTICIPANTS =
+ new Participants(List.of(LOCAL_HOST_ID, REMOTE_HOST_ID_1,
REMOTE_HOST_ID_2));
private static final String KEYSPACE = "cltks";
private static final String TABLE = "cltt";
@@ -57,6 +67,7 @@ public class CoordinatorLogTest
.addClusteringColumn("ck",
UTF8Type.instance)
.addRegularColumn("value",
UTF8Type.instance)
.build());
+ MutationJournal.instance.start();
}
private static Token tk(String key)
@@ -66,7 +77,7 @@ public class CoordinatorLogTest
private static Offsets toOffsets(MutationId... ids)
{
- Offsets.Mutable list = new Offsets.Mutable(LOG_ID);
+ Offsets.Mutable list = new Offsets.Mutable(LOCAL_LOG_ID);
for (MutationId id : ids)
list.add(id.offset());
return list;
@@ -74,15 +85,15 @@ public class CoordinatorLogTest
private static void assertUnreconciled(Token token, TableId tableId,
CoordinatorLog log, boolean includePending, Offsets expectedReconciled,
MutationId... expectedIds)
{
- Offsets.Mutable reconciled = new Offsets.Mutable(LOG_ID);
- Offsets.Mutable unreconciled = new Offsets.Mutable(LOG_ID);
+ Offsets.Mutable reconciled = new Offsets.Mutable(LOCAL_LOG_ID);
+ Offsets.Mutable unreconciled = new Offsets.Mutable(LOCAL_LOG_ID);
log.collectOffsetsFor(token, tableId, includePending, unreconciled,
reconciled);
for (MutationId mid : expectedIds)
- Assert.assertTrue(unreconciled.contains(mid.offset()));
+ assertTrue(unreconciled.contains(mid.offset()));
- Assert.assertEquals(toOffsets(expectedIds), unreconciled);
- Assert.assertEquals(expectedReconciled, reconciled);
+ assertEquals(toOffsets(expectedIds), unreconciled);
+ assertEquals(expectedReconciled, reconciled);
}
@Test
@@ -91,24 +102,18 @@ public class CoordinatorLogTest
Token tk = tk("key");
TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE,
TABLE);
TableId tableId = metadata.id;
- CoordinatorLogPrimary log = new CoordinatorLogPrimary(LOCAL_HOST_ID,
LOG_ID, PARTICIPANTS);
+ CoordinatorLogPrimary log = new CoordinatorLogPrimary(KEYSPACE, new
Range<>(tk, tk), LOCAL_HOST_ID, LOCAL_LOG_ID, PARTICIPANTS);
MutationId[] ids = new MutationId[] { log.nextId(), log.nextId(),
log.nextId(), };
List<Mutation> mutations = new ArrayList<>(ids.length);
for (MutationId id : ids)
{
- Mutation mutation =
- new RowUpdateBuilder(metadata, 0, "key")
- .clustering("ck")
- .add("value", "value")
- .build()
- .withMutationId(id);
-
+ Mutation mutation = createMutation(id);
mutations.add(mutation);
log.startWriting(mutation);
}
- Offsets.Mutable reconciled = new Offsets.Mutable(LOG_ID);
+ Offsets.Mutable reconciled = new Offsets.Mutable(LOCAL_LOG_ID);
// we've only started writing, so the ids shouldn't appear without
includePending being true
assertUnreconciled(tk, tableId, log, false, reconciled);
assertUnreconciled(tk, tableId, log, true, reconciled, ids);
@@ -126,4 +131,86 @@ public class CoordinatorLogTest
reconciled.add(ids[0].offset());
assertUnreconciled(tk, tableId, log, false, reconciled, ids[1],
ids[2]);
}
+
+ private Mutation createMutation(MutationId id)
+ {
+ TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE,
TABLE);
+ return new RowUpdateBuilder(metadata, 0, "key")
+ .clustering("ck")
+ .add("value", "value")
+ .build()
+ .withMutationId(id);
+ }
+
+ @Test
+ public void persistAndLoadPrimaryLogTest()
+ {
+ testPersistAndLoadRoundtrip(LOCAL_LOG_ID);
+ }
+
+ @Test
+ public void persistAndLoadReplicaLogTest()
+ {
+ testPersistAndLoadRoundtrip(REPLICA_LOG_ID);
+ }
+
+ private void testPersistAndLoadRoundtrip(CoordinatorLogId logId)
+ {
+ Range<Token> range = new Range<>(tk("a"), tk("b"));
+
+ Offsets.Mutable offsets1 = new Offsets.Mutable(logId);
+ offsets1.add(1, 2, 3, 4);
+ Offsets.Mutable offsets2 = new Offsets.Mutable(logId);
+ offsets2.add(2, 3, 4, 5);
+ Offsets.Mutable offsets3 = new Offsets.Mutable(logId);
+ offsets3.add(3, 4, 5, 6);
+
+ Node2OffsetsMap witnessed = new Node2OffsetsMap();
+ witnessed.set(LOCAL_HOST_ID, offsets1);
+ witnessed.set(REMOTE_HOST_ID_1, offsets2);
+ witnessed.set(REMOTE_HOST_ID_2, offsets3);
+
+ UnreconciledMutations unreconciled = new UnreconciledMutations();
+ Mutation mutation1 = createMutation(new MutationId(logId.asLong(),
MutationId.sequenceId(1, 0)));
+ Mutation mutation2 = createMutation(new MutationId(logId.asLong(),
MutationId.sequenceId(2, 0)));
+ unreconciled.addDirectly(mutation1);
+ unreconciled.addDirectly(mutation2);
+ MutationJournal.instance.write(mutation1.id(), mutation1);
+ MutationJournal.instance.write(mutation2.id(), mutation2);
+
+ CoordinatorLog log =
+ CoordinatorLog.recreate(KEYSPACE, range, LOCAL_HOST_ID, logId,
PARTICIPANTS, witnessed, witnessed, unreconciled);
+
+ Offsets.Mutable reconciled = new Offsets.Mutable(logId);
+ reconciled.add(3, 4);
+ assertEquals(reconciled, log.reconciledOffsets);
+
+ validatePersistAndLoadRoundtrip(log);
+ log.deleteFromSystemTable();
+ }
+
+ private static void validatePersistAndLoadRoundtrip(CoordinatorLog log)
+ {
+ log.persistToSystemTable();
+ List<CoordinatorLog> logs =
CoordinatorLog.loadFromSystemTable(KEYSPACE, log.range, LOCAL_HOST_ID);
+ assertEquals(1, logs.size());
+ CoordinatorLog loaded = logs.get(0);
+
+ assertSame(log.getClass(), loaded.getClass());
+ assertEquals(log.keyspace, loaded.keyspace);
+ assertEquals(log.range, loaded.range);
+ assertEquals(log.logId, loaded.logId);
+ assertEquals(log.participants, loaded.participants);
+ assertEquals(log.localNodeId, loaded.localNodeId);
+
+ assertEquals(log.participants.size(), log.witnessedOffsets.size());
+ assertEquals(log.participants.size(), log.persistedOffsets.size());
+ assertEquals(loaded.participants.size(),
loaded.witnessedOffsets.size());
+ assertEquals(loaded.participants.size(),
loaded.persistedOffsets.size());
+ assertEquals(log.witnessedOffsets, loaded.witnessedOffsets);
+ assertEquals(log.persistedOffsets, loaded.persistedOffsets);
+ assertEquals(log.reconciledOffsets, loaded.reconciledOffsets);
+
+
assertTrue(log.unreconciledMutations.equalsForTesting(loaded.unreconciledMutations));
+ }
}
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index 61c0a9ba5f..b0d5edda0f 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -843,21 +843,27 @@ public class OffsetsTest
public void testRemoveWithExactSizedArray()
{
{
- Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]
{10, 11}, 2);
+ Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]{
10, 11 }, 2);
offsets.remove(10);
assertOffsetsEqual(offsets(11, 11), offsets);
}
{
- Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]
{10, 11}, 2);
+ Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]{
10, 11 }, 2);
offsets.remove(11);
assertOffsetsEqual(offsets(10, 10), offsets);
}
{
- Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]
{11, 11}, 2);
+ Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]{
11, 11 }, 2);
offsets.remove(11);
assertOffsetsEqual(offsets(), offsets);
}
}
+
+ public void asListFromListRoundTripTest()
+ {
+ for (Offsets.Mutable offsets : new Offsets.Mutable[] { offsets(),
offsets(1, 2), offsets(1, 3, 7, 9) })
+ assertOffsetsEqual(offsets, Offsets.fromList(LOG_ID,
offsets.asList()));
+ }
}
diff --git a/test/unit/org/apache/cassandra/replication/ShardTest.java
b/test/unit/org/apache/cassandra/replication/ShardTest.java
new file mode 100644
index 0000000000..d8c62554f1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/ShardTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.LongSupplier;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.agrona.collections.MutableInteger;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShardTest
+{
+ private static final int LOCAL_HOST_ID = 1;
+ private static final int REMOTE_HOST_ID_1 = 2;
+ private static final int REMOTE_HOST_ID_2 = 3;
+
+ private static final String KEYSPACE = "shard_test_ks";
+ private static final String TABLE = "shard_test_table";
+
+ @BeforeClass
+ public static void setUp() throws IOException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3),
+ TableMetadata.builder(KEYSPACE, TABLE)
+ .addPartitionKeyColumn("pk",
UTF8Type.instance)
+ .addClusteringColumn("ck",
UTF8Type.instance)
+ .addRegularColumn("value",
UTF8Type.instance)
+ .build());
+ MutationJournal.instance.start();
+ }
+
+ private static Token tk(String key)
+ {
+ return new
ByteOrderedPartitioner.BytesToken(ByteBufferUtil.bytes(key));
+ }
+
+ @Test
+ public void testPersistAndLoadSingleShard()
+ {
+ Range<Token> range = new Range<>(tk("a"), tk("z"));
+ Participants participants = new Participants(List.of(LOCAL_HOST_ID,
REMOTE_HOST_ID_1, REMOTE_HOST_ID_2));
+ MutableInteger logId = new MutableInteger();
+ LongSupplier logIdProvider = () ->
CoordinatorLogId.asLong(LOCAL_HOST_ID, logId.getAndIncrement());
+
+ Shard original = new Shard(LOCAL_HOST_ID, KEYSPACE, range,
participants, logIdProvider, (s, l) -> {});
+ original.persistToSystemTables();
+
+ ArrayList<Shard> loadedShards =
Shard.loadFromSystemTables(LOCAL_HOST_ID, logIdProvider, (s, l) -> {});
+ assertEquals(1, loadedShards.size());
+ Shard loaded = loadedShards.get(0);
+
+ assertEquals(original.localNodeId, loaded.localNodeId);
+ assertEquals(original.keyspace, loaded.keyspace);
+ assertEquals(original.range, loaded.range);
+ assertEquals(original.participants, loaded.participants);
+ // TODO: compare the coordinator logs
+ }
+
+ @Test
+ public void testLogRotation()
+ {
+ CoordinatorLog.overrideMaxOffsetForTesting(100);
+ try
+ {
+ Range<Token> range = new Range<>(tk("a"), tk("z"));
+ Participants participants = new
Participants(List.of(LOCAL_HOST_ID, REMOTE_HOST_ID_1, REMOTE_HOST_ID_2));
+ MutableInteger logId = new MutableInteger();
+ LongSupplier logIdProvider = () ->
CoordinatorLogId.asLong(LOCAL_HOST_ID, logId.getAndIncrement());
+ Shard shard = new Shard(LOCAL_HOST_ID, KEYSPACE, range,
participants, logIdProvider, (s, l) -> {
+ });
+
+ MutationId firstId = shard.nextId();
+ for (int i = 0; i < 100; i++)
+ assertEquals(firstId.hostLogId, shard.nextId().hostLogId);
+ assertEquals(firstId.hostLogId + 1, shard.nextId().hostLogId);
+ }
+ finally
+ {
+ CoordinatorLog.overrideMaxOffsetForTesting(Integer.MAX_VALUE);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
index dfa025a327..281c8da079 100644
--- a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
+++ b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
@@ -43,10 +43,13 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+
public class UnreconciledMutationsTest
{
private static final String KEYSPACE = "ks";
private static final String TABLE = "tbl";
+ private static TableId TABLE_ID;
@BeforeClass
public static void setUp() throws IOException
@@ -57,6 +60,7 @@ public class UnreconciledMutationsTest
.addPartitionKeyColumn("k",
Int32Type.instance)
.addRegularColumn("v",
Int32Type.instance)
.build());
+ TABLE_ID = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
}
private static Token tokenFor(int key)
@@ -69,9 +73,7 @@ public class UnreconciledMutationsTest
TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE,
TABLE);
// Create a MutationId with logId 1L and sequenceId that produces the
desired offset
- long logId = 1L;
- long sequenceId = ((long) offset << 32) | 1000; // offset in high
bits, timestamp in low bits
- MutationId mutationId = new MutationId(logId, sequenceId);
+ MutationId mutationId = new MutationId(CoordinatorLogId.asLong(1, 1),
MutationId.sequenceId(offset, 0));
Mutation mutation = new RowUpdateBuilder(metadata, 0, partitionKey)
.add("v", value)
@@ -90,8 +92,7 @@ public class UnreconciledMutationsTest
public void testSingleTokenCollectionIsolation()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create mutations for different partition keys with different tokens
Mutation mutation1 = createMutation(100, 1000, 1);
Mutation mutation2 = createMutation(200, 2000, 2);
@@ -113,30 +114,30 @@ public class UnreconciledMutationsTest
// Test single token collection for token1 - should ONLY return
mutation1
Offsets.Mutable offsets1 = new Offsets.Mutable(new CoordinatorLogId(1,
1));
- boolean found1 = unreconciled.collect(token1, tableId, false,
offsets1);
+ boolean found1 = unreconciled.collect(token1, TABLE_ID, false,
offsets1);
Assert.assertTrue("Should find mutations for token1", found1);
- Assert.assertEquals("Should only contain 1 offset for token1", 1,
offsets1.offsetCount());
+ assertEquals("Should only contain 1 offset for token1", 1,
offsets1.offsetCount());
Assert.assertTrue("Should contain offset 1 for mutation1",
offsets1.contains(1));
Assert.assertFalse("Should NOT contain offset 2 for mutation2",
offsets1.contains(2));
Assert.assertFalse("Should NOT contain offset 3 for mutation3",
offsets1.contains(3));
// Test single token collection for token2 - should ONLY return
mutation2
Offsets.Mutable offsets2 = new Offsets.Mutable(new CoordinatorLogId(1,
1));
- boolean found2 = unreconciled.collect(token2, tableId, false,
offsets2);
+ boolean found2 = unreconciled.collect(token2, TABLE_ID, false,
offsets2);
Assert.assertTrue("Should find mutations for token2", found2);
- Assert.assertEquals("Should only contain 1 offset for token2", 1,
offsets2.offsetCount());
+ assertEquals("Should only contain 1 offset for token2", 1,
offsets2.offsetCount());
Assert.assertTrue("Should contain offset 2 for mutation2",
offsets2.contains(2));
Assert.assertFalse("Should NOT contain offset 1 for mutation1",
offsets2.contains(1));
Assert.assertFalse("Should NOT contain offset 3 for mutation3",
offsets2.contains(3));
// Test single token collection for token3 - should ONLY return
mutation3
Offsets.Mutable offsets3 = new Offsets.Mutable(new CoordinatorLogId(1,
1));
- boolean found3 = unreconciled.collect(token3, tableId, false,
offsets3);
+ boolean found3 = unreconciled.collect(token3, TABLE_ID, false,
offsets3);
Assert.assertTrue("Should find mutations for token3", found3);
- Assert.assertEquals("Should only contain 1 offset for token3", 1,
offsets3.offsetCount());
+ assertEquals("Should only contain 1 offset for token3", 1,
offsets3.offsetCount());
Assert.assertTrue("Should contain offset 3 for mutation3",
offsets3.contains(3));
Assert.assertFalse("Should NOT contain offset 1 for mutation1",
offsets3.contains(1));
Assert.assertFalse("Should NOT contain offset 2 for mutation2",
offsets3.contains(2));
@@ -146,11 +147,10 @@ public class UnreconciledMutationsTest
public void testEmptyCollection()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
Token token = tokenFor(100);
Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1,
1));
- boolean found = unreconciled.collect(token, tableId, false, offsets);
+ boolean found = unreconciled.collect(token, TABLE_ID, false, offsets);
Assert.assertFalse("Should not find any mutations in empty
collection", found);
Assert.assertTrue("Should have no offsets", offsets.isEmpty());
@@ -160,8 +160,7 @@ public class UnreconciledMutationsTest
public void testPendingVsVisibleMutations()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
Mutation pendingMutation = createMutation(100, 1000, 1);
Mutation visibleMutation = createMutation(100, 2000, 2);
Token token = pendingMutation.key().getToken();
@@ -173,19 +172,19 @@ public class UnreconciledMutationsTest
// Test without including pending - should only get visible mutation
Offsets.Mutable visibleOnly = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundVisible = unreconciled.collect(token, tableId, false,
visibleOnly);
+ boolean foundVisible = unreconciled.collect(token, TABLE_ID, false,
visibleOnly);
Assert.assertTrue("Should find visible mutations", foundVisible);
- Assert.assertEquals("Should only have 1 visible mutation", 1,
visibleOnly.offsetCount());
+ assertEquals("Should only have 1 visible mutation", 1,
visibleOnly.offsetCount());
Assert.assertTrue("Should contain visible mutation offset",
visibleOnly.contains(2));
Assert.assertFalse("Should NOT contain pending mutation offset",
visibleOnly.contains(1));
// Test including pending - should get both mutations
Offsets.Mutable includingPending = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundAll = unreconciled.collect(token, tableId, true,
includingPending);
+ boolean foundAll = unreconciled.collect(token, TABLE_ID, true,
includingPending);
Assert.assertTrue("Should find all mutations", foundAll);
- Assert.assertEquals("Should have 2 mutations total", 2,
includingPending.offsetCount());
+ assertEquals("Should have 2 mutations total", 2,
includingPending.offsetCount());
Assert.assertTrue("Should contain pending mutation offset",
includingPending.contains(1));
Assert.assertTrue("Should contain visible mutation offset",
includingPending.contains(2));
}
@@ -194,8 +193,7 @@ public class UnreconciledMutationsTest
public void testMultipleMutationsSameToken()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create multiple mutations for the same partition key (same token)
Mutation mutation1 = createMutation(100, 1000, 1);
Mutation mutation2 = createMutation(100, 2000, 2);
@@ -211,10 +209,10 @@ public class UnreconciledMutationsTest
unreconciled.finishWriting(mutation3);
Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1,
1));
- boolean found = unreconciled.collect(token, tableId, false, offsets);
+ boolean found = unreconciled.collect(token, TABLE_ID, false, offsets);
Assert.assertTrue("Should find mutations for token", found);
- Assert.assertEquals("Should have all 3 mutations for same token", 3,
offsets.offsetCount());
+ assertEquals("Should have all 3 mutations for same token", 3,
offsets.offsetCount());
Assert.assertTrue("Should contain offset 1", offsets.contains(1));
Assert.assertTrue("Should contain offset 2", offsets.contains(2));
Assert.assertTrue("Should contain offset 3", offsets.contains(3));
@@ -224,8 +222,7 @@ public class UnreconciledMutationsTest
public void testTableIdFiltering()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create a fake different table ID
TableId differentTableId = TableId.generate();
@@ -237,10 +234,10 @@ public class UnreconciledMutationsTest
// Query with correct table ID - should find mutation
Offsets.Mutable correctTable = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundCorrect = unreconciled.collect(token, tableId, false,
correctTable);
+ boolean foundCorrect = unreconciled.collect(token, TABLE_ID, false,
correctTable);
Assert.assertTrue("Should find mutation for correct table",
foundCorrect);
- Assert.assertEquals("Should have 1 mutation", 1,
correctTable.offsetCount());
+ assertEquals("Should have 1 mutation", 1, correctTable.offsetCount());
// Query with different table ID - should find nothing
Offsets.Mutable differentTable = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
@@ -254,8 +251,7 @@ public class UnreconciledMutationsTest
public void testMutationRemoval()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
Mutation mutation = createMutation(100, 1000, 1);
Token token = mutation.key().getToken();
@@ -264,16 +260,16 @@ public class UnreconciledMutationsTest
// Verify mutation is present
Offsets.Mutable beforeRemoval = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundBefore = unreconciled.collect(token, tableId, false,
beforeRemoval);
+ boolean foundBefore = unreconciled.collect(token, TABLE_ID, false,
beforeRemoval);
Assert.assertTrue("Should find mutation before removal", foundBefore);
- Assert.assertEquals("Should have 1 mutation before removal", 1,
beforeRemoval.offsetCount());
+ assertEquals("Should have 1 mutation before removal", 1,
beforeRemoval.offsetCount());
// Remove the mutation
unreconciled.remove(1);
// Verify mutation is gone
Offsets.Mutable afterRemoval = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundAfter = unreconciled.collect(token, tableId, false,
afterRemoval);
+ boolean foundAfter = unreconciled.collect(token, TABLE_ID, false,
afterRemoval);
Assert.assertFalse("Should not find mutation after removal",
foundAfter);
Assert.assertTrue("Should have no mutations after removal",
afterRemoval.isEmpty());
}
@@ -282,8 +278,7 @@ public class UnreconciledMutationsTest
public void testTokenRangeCollection()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create mutations with different tokens and sort them
List<Integer> keys = List.of(100, 200, 300, 400, 500);
List<Mutation> mutations = new ArrayList<>();
@@ -311,7 +306,7 @@ public class UnreconciledMutationsTest
new Bounds<>(startKey, endKey);
Offsets.Mutable rangeOffsets = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundRange = unreconciled.collect(bounds, tableId, false,
rangeOffsets);
+ boolean foundRange = unreconciled.collect(bounds, TABLE_ID, false,
rangeOffsets);
Assert.assertTrue("Should find mutations in range", foundRange);
// Should include mutations at positions 0, 1, and 2 (first through
middle inclusive)
@@ -322,8 +317,7 @@ public class UnreconciledMutationsTest
public void testSingleTokenRangeCollectionBug()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create mutations for different partition keys with different tokens
Mutation mutation1 = createMutation(100, 1000, 1);
Mutation mutation2 = createMutation(200, 2000, 2);
@@ -342,11 +336,11 @@ public class UnreconciledMutationsTest
// Test single token range collection
Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1,
1));
Bounds<PartitionPosition> bounds = new Bounds<>(mutation1.key(),
mutation1.key());
- boolean found = unreconciled.collect(bounds, tableId, false, offsets);
+ boolean found = unreconciled.collect(bounds, TABLE_ID, false, offsets);
// This should only return mutation1
Assert.assertTrue("Should find mutations for single token range",
found);
- Assert.assertEquals("Single token range should only return 1
mutation", 1, offsets.offsetCount());
+ assertEquals("Single token range should only return 1 mutation", 1,
offsets.offsetCount());
Assert.assertTrue("Should contain mutation1 offset",
offsets.contains(1));
// other mutations should not be included
@@ -358,8 +352,7 @@ public class UnreconciledMutationsTest
public void testFullRangeCollectionWithMinimumToken()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create mutations for different partition keys with different tokens
Mutation mutation1 = createMutation(100, 1000, 1);
Mutation mutation2 = createMutation(200, 2000, 2);
@@ -382,11 +375,11 @@ public class UnreconciledMutationsTest
// Test full range collection - this SHOULD return ALL mutations
Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1,
1));
- boolean found = unreconciled.collect(fullRange, tableId, false,
offsets);
+ boolean found = unreconciled.collect(fullRange, TABLE_ID, false,
offsets);
// Full range should return ALL mutations
Assert.assertTrue("Should find mutations for full range", found);
- Assert.assertEquals("Full range should return ALL mutations", 3,
offsets.offsetCount());
+ assertEquals("Full range should return ALL mutations", 3,
offsets.offsetCount());
Assert.assertTrue("Should contain mutation1 offset",
offsets.contains(1));
Assert.assertTrue("Should contain mutation2 offset",
offsets.contains(2));
Assert.assertTrue("Should contain mutation3 offset",
offsets.contains(3));
@@ -396,8 +389,7 @@ public class UnreconciledMutationsTest
public void testSingleTokenCollectionVsFullRange()
{
UnreconciledMutations unreconciled = new UnreconciledMutations();
- TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-
+
// Create mutations with different tokens
Mutation mutation1 = createMutation(100, 1000, 1);
Mutation mutation2 = createMutation(200, 2000, 2);
@@ -415,10 +407,10 @@ public class UnreconciledMutationsTest
// Single token collection should only return mutation1
Offsets.Mutable singleToken = new Offsets.Mutable(new
CoordinatorLogId(1, 1));
- boolean foundSingle = unreconciled.collect(targetToken, tableId,
false, singleToken);
+ boolean foundSingle = unreconciled.collect(targetToken, TABLE_ID,
false, singleToken);
Assert.assertTrue("Should find mutation for single token",
foundSingle);
- Assert.assertEquals("Single token should only return 1 mutation", 1,
singleToken.offsetCount());
+ assertEquals("Single token should only return 1 mutation", 1,
singleToken.offsetCount());
Assert.assertTrue("Should contain mutation1 offset",
singleToken.contains(1));
// This is the CRITICAL test - single token collection must NOT return
all mutations
@@ -426,4 +418,33 @@ public class UnreconciledMutationsTest
Assert.assertFalse("Single token collection should NOT contain
mutation2", singleToken.contains(2));
Assert.assertFalse("Single token collection should NOT contain
mutation3", singleToken.contains(3));
}
+
+ @Test
+ public void testLoadFromJournal()
+ {
+ MutationJournal.instance.start();
+
+ CoordinatorLogId logId = new CoordinatorLogId(1, 1);
+
+ Offsets.Mutable offsets1 = new Offsets.Mutable(logId);
+ offsets1.add(1, 2, 3, 4, 5, 6, 7);
+
+ Offsets.Mutable offsets2 = new Offsets.Mutable(logId);
+ offsets2.add(1, 2, 3, 4, 5);
+
+ Node2OffsetsMap witnessed = new Node2OffsetsMap();
+ witnessed.set(1, offsets1);
+ witnessed.set(2, offsets2);
+
+ Mutation mutation6 = createMutation(6, 6, 6);
+ Mutation mutation7 = createMutation(7, 7, 7);
+ MutationJournal.instance.write(mutation6.id(), mutation6);
+ MutationJournal.instance.write(mutation7.id(), mutation7);
+
+ Offsets.Mutable loadedOffsets = new Offsets.Mutable(logId);
+ UnreconciledMutations unreconciled =
UnreconciledMutations.loadFromJournal(witnessed, 1);
+ unreconciled.collect(mutation6.key().getToken(), TABLE_ID, false,
loadedOffsets);
+ unreconciled.collect(mutation7.key().getToken(), TABLE_ID, false,
loadedOffsets);
+ assertEquals(List.of(6, 7), loadedOffsets.asList());
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]