aratno commented on code in PR #4360:
URL: https://github.com/apache/cassandra/pull/4360#discussion_r2325409084
##########
src/java/org/apache/cassandra/replication/Node2OffsetsMap.java:
##########
@@ -25,24 +33,93 @@
*/
public class Node2OffsetsMap
{
- private final Int2ObjectHashMap<Offsets> map = new Int2ObjectHashMap<>();
+ private final Int2ObjectHashMap<Offsets.Mutable> offsetstMap;
Review Comment:
Nit - typo
##########
src/java/org/apache/cassandra/replication/Shard.java:
##########
@@ -18,72 +18,93 @@
package org.apache.cassandra.replication;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import com.google.common.base.Preconditions;
import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
import org.jctools.maps.NonBlockingHashMapLong;
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
public class Shard
{
- private final String keyspace;
- private final Range<Token> tokenRange;
- private final int localHostId;
+ final int localNodeId;
+ final String keyspace;
+ final Range<Token> range;
+
private final Participants participants;
- private final Epoch sinceEpoch;
+ private final LongSupplier logIdProvider;
private final BiConsumer<Shard, CoordinatorLog> onNewLog;
private final NonBlockingHashMapLong<CoordinatorLog> logs;
- // TODO (expected): add support for log rotation
- private final CoordinatorLog.CoordinatorLogPrimary currentLocalLog;
-
- private final List<Subscriber> subscribers = new ArrayList<>();
-
- public interface Subscriber
- {
- default void onLogCreation(CoordinatorLog log) {}
- default void onSubscribe(CoordinatorLog currentLog) {}
- }
+ private volatile CoordinatorLogPrimary currentLocalLog;
- Shard(String keyspace,
- Range<Token> tokenRange,
- int localHostId,
+ Shard(int localNodeId,
+ String keyspace,
+ Range<Token> range,
Participants participants,
- Epoch sinceEpoch,
- IntSupplier logIdProvider,
+ List<CoordinatorLog> logs,
+ LongSupplier logIdProvider,
BiConsumer<Shard, CoordinatorLog> onNewLog)
{
- Preconditions.checkArgument(participants.contains(localHostId));
+ Preconditions.checkArgument(participants.contains(localNodeId));
+ this.localNodeId = localNodeId;
this.keyspace = keyspace;
- this.tokenRange = tokenRange;
- this.localHostId = localHostId;
+ this.range = range;
this.participants = participants;
- this.sinceEpoch = sinceEpoch;
+ this.logIdProvider = logIdProvider;
this.logs = new NonBlockingHashMapLong<>();
this.onNewLog = onNewLog;
- this.currentLocalLog = startNewLog(localHostId,
logIdProvider.getAsInt(), participants);
- CoordinatorLogId logId = currentLocalLog.logId;
- Preconditions.checkArgument(!logId.isNone());
- logs.put(logId.asLong(), currentLocalLog);
+ for (CoordinatorLog log : logs)
+ {
+ this.logs.put(log.logId.asLong(), log);
+ onNewLog.accept(Shard.this, log);
+ }
+ this.currentLocalLog = createNewPrimayLog();
+ }
+
+ Shard(int localNodeId, String keyspace, Range<Token> range, Participants
participants, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog>
onNewLog)
+ {
+ this(localNodeId, keyspace, range, participants,
Collections.emptyList(), logIdProvider, onNewLog);
}
MutationId nextId()
{
- return currentLocalLog.nextId();
+ MutationId nextId = currentLocalLog.nextId();
+ if (nextId != null)
+ return nextId;
+ return maybeRotateLocalLogAndGetNextId();
+ }
+
+ // if ids overflow, we need to rotate the local log
+ synchronized private MutationId maybeRotateLocalLogAndGetNextId()
+ {
+ MutationId nextId = currentLocalLog.nextId();
+ if (nextId != null) // another thread got to rotate before us
+ return nextId;
+ currentLocalLog = createNewPrimayLog();
Review Comment:
Add debug logging on rotation?
We should consider dispatching rotate manually in the fuzz tests, or
supporting a configurable sequence ID limit that's lower than
Integer.MAX_VALUE. Otherwise rotation would not be exercised in fuzz tests that
lasted any reasonable duration.
If a log is full, the next attempt to get an ID will block on persistence to
the system table, and concurrent attempts will enqueue behind that attempt. We
could schedule the rotation a bit early and only block once we're truly out of
IDs.
##########
src/java/org/apache/cassandra/replication/CoordinatorLog.java:
##########
@@ -343,16 +402,119 @@ private long nextSequenceId()
static class CoordinatorLogReplica extends CoordinatorLog
{
- CoordinatorLogReplica(int localHostId, CoordinatorLogId logId,
Participants participants)
+ CoordinatorLogReplica(
+ String keyspace, Range<Token> range, int localNodeId,
CoordinatorLogId logId, Participants participants,
+ Node2OffsetsMap witnessedOffsets, Node2OffsetsMap persistedOffsets)
{
- super(localHostId, logId, participants);
+ super(keyspace, range, localNodeId, logId, participants,
witnessedOffsets, persistedOffsets);
+ }
+
+ CoordinatorLogReplica(String keyspace, Range<Token> range, int
localNodeId, CoordinatorLogId logId, Participants participants)
+ {
+ super(keyspace, range, localNodeId, logId, participants);
}
@Override
- void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+ void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
{
// no-op
}
}
+ /*
+ * Persist to / load from system table.
+ */
+
+ private static final String INSERT_QUERY =
+ format("INSERT INTO %s.%s (keyspace_name, range_start, range_end,
host_id, host_log_id, participants, witnessed_offsets, persisted_offsets) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.COORDINATOR_LOGS);
+
+ void persistToSystemTable()
+ {
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+ lock.readLock().lock();
+ try
+ {
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ executeInternal(INSERT_QUERY, keyspace, range.left.toString(),
range.right.toString(), logId.hostId,
+ logId.hostLogId, participants.asSet(), witnessed,
persisted);
+ }
+
+ void updateLogsInSystemTable()
+ {
+ Offsets.Mutable localWitnessed;
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+ lock.readLock().lock();
+ try
+ {
+ localWitnessed =
Offsets.Mutable.copy(witnessedOffsets.get(localNodeId));
+
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
Review Comment:
Would be good to make this incremental in the future, since every witnessed
offset will eventually be persisted and reconciled
##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -264,41 +276,44 @@ private KeyspaceShards getOrCreateShards(String keyspace)
ClusterMetadata csm = ClusterMetadata.current();
KeyspaceMetadata ksm = csm.schema.getKeyspaceMetadata(keyspace);
- return keyspaceShards.computeIfAbsent(keyspace, ignore ->
KeyspaceShards.make(ksm, csm, this::nextHostLogId, this::onNewLog));
+ return keyspaceShards.computeIfAbsent(keyspace, ignore ->
KeyspaceShards.make(ksm, csm, this::nextLogId, this::onNewLog));
+ }
+
+ private long nextLogId()
+ {
+ NodeId nodeId = ClusterMetadata.current().myNodeId();
+ Preconditions.checkNotNull(nodeId);
+ return CoordinatorLogId.asLong(nodeId.id(), nextHostLogId());
}
- // TODO (expected): durability
- int nextHostLogId()
+ /*
+ * Allocate and persist the next host log id.
+ * We only do this on startup and when rotating logs.
+ */
+ private int nextHostLogId()
{
- return nextHostLogId.incrementAndGet();
+ int nextHostLogId = ++prevHostLogId;
+ persistHostLogIdToSystemTable(nextHostLogId);
+ return nextHostLogId;
}
- private final AtomicInteger nextHostLogId = new AtomicInteger();
+ private int prevHostLogId;
- public boolean isDurablyReconciled(String keyspace,
ImmutableCoordinatorLogOffsets logOffsets)
+ public boolean isDurablyReconciled(ImmutableCoordinatorLogOffsets
logOffsets)
{
// Could pass through SSTable bounds to exclude shards for
non-overlapping ranges, but this will mostly be
// called on flush for L0 SSTables with wide bounds.
-
- KeyspaceShards shards = keyspaceShards.get(keyspace);
- if (shards == null)
- {
- logger.debug("Could not find shards for keyspace {}", keyspace);
- return false;
- }
-
for (Long logId : logOffsets)
{
- CoordinatorLogId coordinatorLogId = new CoordinatorLogId(logId);
- CoordinatorLog log = shards.logs.get(coordinatorLogId);
- if (log == null)
+ Shard shard = log2ShardMap.get(new CoordinatorLogId(logId));
+ if (shard == null)
{
- logger.warn("Could not determine lifecycle for unknown logId
{}, not marking as durably reconciled", coordinatorLogId);
+ logger.debug("Could not find shard for logId {}", logId);
Review Comment:
Feels like this should be an assertion failure for now? We should never have
a local SSTable with an unknown logId.
##########
src/java/org/apache/cassandra/replication/Shard.java:
##########
@@ -143,39 +164,31 @@ List<InetAddressAndPort> remoteReplicas()
for (int i = 0, size = participants.size(); i < size; ++i)
{
int hostId = participants.get(i);
- if (hostId != localHostId)
+ if (hostId != localNodeId)
replicas.add(ClusterMetadata.current().directory.endpoint(new
NodeId(hostId)));
}
return replicas;
}
+ boolean isDurablyReconciled(long logId, CoordinatorLogOffsets<?>
logOffsets)
+ {
+ return logs.get(logId).isDurablyReconciled(logOffsets);
Review Comment:
Update CoordinatorLog#isDurablyReconciled to use persistedOffsets?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]