This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new bd5a657548 Implement Shard and CoordinatorLog metadata durability
bd5a657548 is described below

commit bd5a657548f27a0b6a5f08f1452a39b4f14064cb
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Thu Jul 17 13:00:33 2025 +0100

    Implement Shard and CoordinatorLog metadata durability
    
    patch by Aleksey Yeschenko; reviewed by Abe Ratnofsky for CASSANDRA-20882
---
 .../org/apache/cassandra/db/SystemKeyspace.java    |  80 ++++--
 .../cassandra/io/sstable/format/SSTableWriter.java |   2 +-
 .../cassandra/replication/BroadcastLogOffsets.java |  12 +-
 .../cassandra/replication/CoordinatorLog.java      | 304 ++++++++++++++++-----
 .../replication/MutationTrackingService.java       | 227 ++++++++-------
 .../cassandra/replication/Node2OffsetsMap.java     |  89 +++++-
 .../org/apache/cassandra/replication/Offsets.java  |  33 +++
 .../apache/cassandra/replication/Participants.java |  20 ++
 .../org/apache/cassandra/replication/Shard.java    | 188 ++++++++-----
 .../replication/UnreconciledMutations.java         |  38 +++
 .../test/log/SystemKeyspaceStorageTest.java        |   4 +-
 .../cassandra/cql3/SystemKeyspaceQueryTest.java    |   4 +-
 .../replication/CoordinatorLogOffsetsTest.java     |   8 +-
 .../cassandra/replication/CoordinatorLogTest.java  | 123 +++++++--
 .../apache/cassandra/replication/OffsetsTest.java  |  12 +-
 .../apache/cassandra/replication/ShardTest.java    | 112 ++++++++
 .../replication/UnreconciledMutationsTest.java     | 115 ++++----
 17 files changed, 1043 insertions(+), 328 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index a15b46665f..aa3834c4ff 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,7 +188,10 @@ public final class SystemKeyspace
     public static final String REPAIRS = "repairs";
     public static final String TOP_PARTITIONS = "top_partitions";
     public static final String METADATA_LOG = "local_metadata_log";
-    public static final String SNAPSHOT_TABLE_NAME = "metadata_snapshots";
+    public static final String METADATA_SNAPSHOTS = "metadata_snapshots";
+    public static final String HOST_LOG_ID = "host_log_id";
+    public static final String SHARDS = "shards";
+    public static final String COORDINATOR_LOGS = "coordinator_logs";
 
     /**
      * By default the system keyspace tables should be stored in a single data 
directory to allow the server
@@ -223,14 +226,16 @@ public final class SystemKeyspace
         TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY, AVAILABLE_RANGES_V2, 
TRANSFERRED_RANGES_V2, VIEW_BUILDS_IN_PROGRESS,
         BUILT_VIEWS, PREPARED_STATEMENTS, REPAIRS, TOP_PARTITIONS, 
LEGACY_PEERS, LEGACY_PEER_EVENTS,
         LEGACY_TRANSFERRED_RANGES, LEGACY_AVAILABLE_RANGES, 
LEGACY_SIZE_ESTIMATES, LEGACY_SSTABLE_ACTIVITY,
-        METADATA_LOG, SNAPSHOT_TABLE_NAME, CONSENSUS_MIGRATION_STATE);
+        METADATA_LOG, METADATA_SNAPSHOTS, CONSENSUS_MIGRATION_STATE,
+        HOST_LOG_ID, SHARDS, COORDINATOR_LOGS);
 
     public static final Set<String> TABLE_NAMES = ImmutableSet.of(
         BATCHES, PAXOS, PAXOS_REPAIR_HISTORY, BUILT_INDEXES, LOCAL, PEERS_V2, 
PEER_EVENTS_V2,
         COMPACTION_HISTORY, SSTABLE_ACTIVITY_V2, TABLE_ESTIMATES, 
AVAILABLE_RANGES_V2, TRANSFERRED_RANGES_V2, VIEW_BUILDS_IN_PROGRESS,
         BUILT_VIEWS, PREPARED_STATEMENTS, REPAIRS, TOP_PARTITIONS, 
LEGACY_PEERS, LEGACY_PEER_EVENTS,
         LEGACY_TRANSFERRED_RANGES, LEGACY_AVAILABLE_RANGES, 
LEGACY_SIZE_ESTIMATES, LEGACY_SSTABLE_ACTIVITY,
-        METADATA_LOG, SNAPSHOT_TABLE_NAME, CONSENSUS_MIGRATION_STATE);
+        METADATA_LOG, METADATA_SNAPSHOTS, CONSENSUS_MIGRATION_STATE,
+        HOST_LOG_ID, SHARDS, COORDINATOR_LOGS);
 
     public static final TableMetadata Batches =
         parse(BATCHES,
@@ -516,7 +521,7 @@ public final class SystemKeyspace
           + "cfids set<uuid>, "
           + "PRIMARY KEY (parent_id))").build();
 
-    public static final TableMetadata LocalMetadataLog =
+    private static final TableMetadata LocalMetadataLog =
         parse(METADATA_LOG,
               "Local Metadata Log",
               "CREATE TABLE %s ("
@@ -530,13 +535,49 @@ public final class SystemKeyspace
                                                           
"compaction_window_size","1")))
         .build();
 
-    public static final TableMetadata Snapshots = parse(SNAPSHOT_TABLE_NAME,
-                                                        "ClusterMetadata 
snapshots",
-                                                        "CREATE TABLE IF NOT 
EXISTS %s (" +
-                                                        "epoch bigint PRIMARY 
KEY," +
-                                                        "snapshot blob)")
-                                                  
.partitioner(MetaStrategy.partitioner)
-                                                  .build();
+    private static final TableMetadata Snapshots =
+        parse(METADATA_SNAPSHOTS,
+              "ClusterMetadata snapshots",
+              "CREATE TABLE IF NOT EXISTS %s (" +
+              "epoch bigint PRIMARY KEY," +
+              "snapshot blob)")
+        .partitioner(MetaStrategy.partitioner)
+        .build();
+
+    private static final TableMetadata HostLogId =
+        parse(HOST_LOG_ID,
+              "mutation tracking last used host log id",
+              "CREATE TABLE %s ("
+              + "key text,"
+              + "host_log_id int,"
+              + "PRIMARY KEY ((key)))")
+              .build();
+
+    private static final TableMetadata Shards =
+        parse(SHARDS,
+              "mutation tracking shards",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "range_start text,"
+              + "range_end text,"
+              + "participants frozen<set<int>>,"
+              + "PRIMARY KEY ((keyspace_name, range_start, range_end)))")
+              .build();
+
+    private static final TableMetadata CoordinatorLogs =
+        parse(COORDINATOR_LOGS,
+              "mutation tracking coordinator logs",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "range_start text,"
+              + "range_end text,"
+              + "host_id int,"
+              + "host_log_id int,"
+              + "participants frozen<set<int>>,"
+              + "witnessed_offsets map<int, frozen<list<int>>>,"
+              + "persisted_offsets map<int, frozen<list<int>>>,"
+              + "PRIMARY KEY ((keyspace_name, range_start, range_end), 
host_id, host_log_id))")
+              .build();
 
     @Deprecated(since = "4.0")
     private static final TableMetadata LegacyPeers =
@@ -631,7 +672,10 @@ public final class SystemKeyspace
                          TopPartitions,
                          LocalMetadataLog,
                          Snapshots,
-                         ConsensusMigrationState);
+                         ConsensusMigrationState,
+                         HostLogId,
+                         Shards,
+                         CoordinatorLogs);
     }
 
     private static volatile Map<TableId, Pair<CommitLogPosition, Long>> 
truncationRecords;
@@ -2127,16 +2171,16 @@ public final class SystemKeyspace
     {
         Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot store 
a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
         logger.info("Storing snapshot of cluster metadata at epoch {}", epoch);
-        String query = String.format("INSERT INTO %s.%s (epoch, snapshot) 
VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+        String query = String.format("INSERT INTO %s.%s (epoch, snapshot) 
VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
         executeInternal(query, epoch.getEpoch(), snapshot);
-        forceBlockingFlush(SNAPSHOT_TABLE_NAME);
+        forceBlockingFlush(METADATA_SNAPSHOTS);
     }
 
     public static ByteBuffer getSnapshot(Epoch epoch)
     {
         Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot 
retrieve a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
         logger.info("Getting snapshot of epoch = {}", epoch);
-        String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch = 
?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+        String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch = 
?", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
         UntypedResultSet res = executeInternal(query, epoch.getEpoch());
         if (res == null || res.isEmpty())
             return null;
@@ -2165,7 +2209,7 @@ public final class SystemKeyspace
     {
         // during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the 
reverse partitioner doesn't support negative keys)
         search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
-        String query = String.format("SELECT snapshot FROM %s.%s WHERE 
token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SNAPSHOT_TABLE_NAME);
+        String query = String.format("SELECT snapshot FROM %s.%s WHERE 
token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
METADATA_SNAPSHOTS);
         UntypedResultSet res = executeInternal(query, search.getEpoch());
         if (res != null && !res.isEmpty())
             return res.one().getBytes("snapshot").duplicate();
@@ -2176,7 +2220,7 @@ public final class SystemKeyspace
     {
         // during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the 
reverse partitioner doesn't support negative keys)
         search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
-        String query = String.format("SELECT epoch FROM %s.%s WHERE 
token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SNAPSHOT_TABLE_NAME);
+        String query = String.format("SELECT epoch FROM %s.%s WHERE 
token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
METADATA_SNAPSHOTS);
         UntypedResultSet res = executeInternal(query, search.getEpoch());
         if (res == null)
             return Collections.emptyList();
@@ -2189,7 +2233,7 @@ public final class SystemKeyspace
      */
     public static ByteBuffer findLastSnapshot()
     {
-        String query = String.format("SELECT snapshot FROM %s.%s LIMIT 1", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+        String query = String.format("SELECT snapshot FROM %s.%s LIMIT 1", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
         UntypedResultSet res = executeInternal(query);
         if (res != null && !res.isEmpty())
             return res.one().getBytes("snapshot").duplicate();
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 0190282fc7..bc183a56fa 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -343,7 +343,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
         if (metadata().replicationType().isTracked() && repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE)
         {
             Preconditions.checkState(Objects.equals(pendingRepair, 
ActiveRepairService.NO_PENDING_REPAIR));
-            if 
(MutationTrackingService.instance.isDurablyReconciled(getKeyspaceName(), 
coordinatorLogOffsets))
+            if 
(MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets))
             {
                 repairedAt = Clock.Global.currentTimeMillis();
                 logger.debug("Marking SSTable {} as reconciled with repairedAt 
{}", descriptor, repairedAt);
diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java 
b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
index b3fc118613..ce8fe512c9 100644
--- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
+++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java
@@ -41,12 +41,14 @@ public class BroadcastLogOffsets
     private final String keyspace;
     private final Range<Token> range;
     private final List<Offsets.Immutable> replicatedOffsets;
+    private final boolean durable;
 
-    public BroadcastLogOffsets(String keyspace, Range<Token> range, 
List<Offsets.Immutable> offsets)
+    public BroadcastLogOffsets(String keyspace, Range<Token> range, 
List<Offsets.Immutable> offsets, boolean durable)
     {
         this.keyspace = keyspace;
         this.range = range;
         this.replicatedOffsets = offsets;
+        this.durable = durable;
     }
 
     boolean isEmpty()
@@ -57,7 +59,7 @@ public class BroadcastLogOffsets
     @Override
     public String toString()
     {
-        return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + 
replicatedOffsets + '}';
+        return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + 
replicatedOffsets + ", " + durable + '}';
     }
 
     public static final IVerbHandler<BroadcastLogOffsets> verbHandler = 
message -> {
@@ -66,6 +68,7 @@ public class BroadcastLogOffsets
         
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
                                                                  
replicatedOffsets.range,
                                                                  
replicatedOffsets.replicatedOffsets,
+                                                                 
replicatedOffsets.durable,
                                                                  
message.from());
     };
 
@@ -79,6 +82,7 @@ public class BroadcastLogOffsets
             out.writeInt(status.replicatedOffsets.size());
             for (Offsets.Immutable logOffsets : status.replicatedOffsets)
                 Offsets.serializer.serialize(logOffsets, out, version);
+            out.writeBoolean(status.durable);
         }
 
         @Override
@@ -90,7 +94,8 @@ public class BroadcastLogOffsets
             List<Offsets.Immutable> replicatedOffsets = new ArrayList<>(count);
             for (int i = 0; i < count; ++i)
                 replicatedOffsets.add(Offsets.serializer.deserialize(in, 
version));
-            return new BroadcastLogOffsets(keyspace, range, replicatedOffsets);
+            boolean durable = in.readBoolean();
+            return new BroadcastLogOffsets(keyspace, range, replicatedOffsets, 
durable);
         }
 
         @Override
@@ -102,6 +107,7 @@ public class BroadcastLogOffsets
             size += 
TypeSizes.sizeof(replicatedOffsets.replicatedOffsets.size());
             for (Offsets.Immutable logOffsets : 
replicatedOffsets.replicatedOffsets)
                 size += Offsets.serializer.serializedSize(logOffsets, version);
+            size += TypeSizes.sizeof(replicatedOffsets.durable);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 13228256e2..312c355237 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -17,7 +17,11 @@
  */
 package org.apache.cassandra.replication;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -27,74 +31,101 @@ import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.agrona.collections.Int2ObjectHashMap;
 import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.tcm.ClusterMetadata;
 
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.replication.Node2OffsetsMap.forParticipants;
+import static 
org.apache.cassandra.replication.Node2OffsetsMap.fromPrimitiveMap;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 
 public abstract class CoordinatorLog
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorLog.class);
 
-    protected final int localHostId;
+    protected final int localNodeId;
+    protected final String keyspace;
+    protected final Range<Token> range;
     protected final CoordinatorLogId logId;
     protected final Participants participants;
 
+    protected final Node2OffsetsMap witnessedOffsets;
+    protected final Node2OffsetsMap persistedOffsets;
+
     protected final UnreconciledMutations unreconciledMutations;
-    protected final Offsets.Mutable[] witnessedOffsets;
     protected final Offsets.Mutable reconciledOffsets;
+    protected final Offsets.Mutable reconciledPersistedOffsets;
 
     protected final ReadWriteLock lock;
 
-    abstract void receivedWriteResponse(ShortMutationId mutationId, int 
fromHostId);
+    abstract void receivedWriteResponse(ShortMutationId mutationId, int 
fromNodeId);
 
-    CoordinatorLog(int localHostId, CoordinatorLogId logId, Participants 
participants)
+    CoordinatorLog(String keyspace,
+                   Range<Token> range,
+                   int localNodeId,
+                   CoordinatorLogId logId,
+                   Participants participants,
+                   Node2OffsetsMap witnessedOffsets,
+                   Node2OffsetsMap persistedOffsets,
+                   UnreconciledMutations unreconciledMutations)
     {
-        this.localHostId = localHostId;
+        this.localNodeId = localNodeId;
+        this.keyspace = keyspace;
+        this.range = range;
         this.logId = logId;
         this.participants = participants;
-
-        this.unreconciledMutations = new UnreconciledMutations();
+        this.unreconciledMutations = unreconciledMutations;
+        this.witnessedOffsets = witnessedOffsets;
+        this.reconciledOffsets = witnessedOffsets.intersection();
+        this.persistedOffsets = persistedOffsets;
+        this.reconciledPersistedOffsets = persistedOffsets.intersection();
         this.lock = new ReentrantReadWriteLock();
+    }
 
-        Offsets.Mutable[] ids = new Offsets.Mutable[participants.size()];
-        for (int i = 0; i < participants.size(); i++)
-            ids[i] = new Offsets.Mutable(logId);
+    CoordinatorLog(String keyspace, Range<Token> range, int localNodeId, 
CoordinatorLogId logId, Participants participants)
+    {
+        this(keyspace, range, localNodeId, logId, participants, 
forParticipants(logId, participants), forParticipants(logId, participants), new 
UnreconciledMutations());
+    }
 
-        witnessedOffsets = ids;
-        reconciledOffsets = new Offsets.Mutable(logId);
+    static CoordinatorLog create(String keyspace, Range<Token> range, int 
localNodeId, CoordinatorLogId id, Participants participants)
+    {
+        return id.hostId == localNodeId ? new CoordinatorLogPrimary(keyspace, 
range, localNodeId, id, participants)
+                                        : new CoordinatorLogReplica(keyspace, 
range, localNodeId, id, participants);
     }
 
-    static CoordinatorLog create(int localHostId, CoordinatorLogId id, 
Participants participants)
+    static CoordinatorLog recreate(
+        String keyspace, Range<Token> range, int localNodeId, CoordinatorLogId 
id, Participants participants,
+        Node2OffsetsMap witnessedOffsets, Node2OffsetsMap persistedOffsets, 
UnreconciledMutations unreconciledMutations)
     {
-        return id.hostId == localHostId ? new 
CoordinatorLogPrimary(localHostId, id, participants)
-                                        : new 
CoordinatorLogReplica(localHostId, id, participants);
+        return id.hostId == localNodeId ? new CoordinatorLogPrimary(keyspace, 
range, localNodeId, id, participants, witnessedOffsets, persistedOffsets, 
unreconciledMutations)
+                                        : new CoordinatorLogReplica(keyspace, 
range, localNodeId, id, participants, witnessedOffsets, persistedOffsets, 
unreconciledMutations);
     }
 
-    void updateReplicatedOffsets(Offsets offsets, int onHostId)
+    void updateReplicatedOffsets(Offsets offsets, boolean persisted, int 
onNodeId)
     {
         lock.writeLock().lock();
         try
         {
-            get(onHostId).addAll(offsets, (ignore, start, end) ->
-            {
-                for (int offset = start; offset <= end; ++offset)
-                {
-                    // TODO (desired): use the fact that Offsets are ordered 
to optimise this look up
-                    if (othersWitnessed(offset, onHostId))
-                    {
-                        reconciledOffsets.add(offset);
-                        unreconciledMutations.remove(offset);
-                    }
-                }
-            });
+            if (persisted)
+                updatePersistedReplicatedOffsets(offsets, onNodeId);
+            else
+                updateWitnessedReplicatedOffsets(offsets, onNodeId);
         }
         finally
         {
@@ -102,13 +133,37 @@ public abstract class CoordinatorLog
         }
     }
 
+    private void updateWitnessedReplicatedOffsets(Offsets offsets, int 
onNodeId)
+    {
+        witnessedOffsets.get(onNodeId).addAll(offsets, (ignore, start, end) ->
+        {
+            for (int offset = start; offset <= end; ++offset)
+            {
+                // TODO (desired): use the fact that Offsets are ordered to 
optimise this look up
+                if (othersWitnessed(offset, onNodeId))
+                {
+                    reconciledOffsets.add(offset);
+                    unreconciledMutations.remove(offset);
+                }
+            }
+        });
+    }
+
+    private void updatePersistedReplicatedOffsets(Offsets offsets, int 
onNodeId)
+    {
+        persistedOffsets.get(onNodeId).addAll(offsets);
+        reconciledPersistedOffsets.addAll(persistedOffsets.intersection());
+    }
+
     @Nullable
-    Offsets.Immutable collectReplicatedOffsets()
+    Offsets.Immutable collectReplicatedOffsets(boolean persisted)
     {
         lock.readLock().lock();
         try
         {
-            Offsets offsets = 
witnessedOffsets[participants.indexOf(localHostId)];
+            Offsets offsets = persisted
+                            ? persistedOffsets.get(localNodeId)
+                            : witnessedOffsets.get(localNodeId);
             return offsets.isEmpty() ? null : Offsets.Immutable.copy(offsets);
         }
         finally
@@ -122,7 +177,7 @@ public abstract class CoordinatorLog
         lock.writeLock().lock();
         try
         {
-            if (getLocal().contains(mutation.id().offset()))
+            if 
(witnessedOffsets.get(localNodeId).contains(mutation.id().offset()))
                 return false; // already witnessed; shouldn't get to this path 
often (duplicate mutation)
 
             unreconciledMutations.startWriting(mutation);
@@ -143,7 +198,7 @@ public abstract class CoordinatorLog
         {
             int offset = mutation.id().offset();
             // we've raced with another write, no need to do anything else
-            if (!getLocal().add(offset))
+            if (!witnessedOffsets.get(localNodeId).add(offset))
                 return;
 
             unreconciledMutations.finishWriting(mutation);
@@ -160,12 +215,12 @@ public abstract class CoordinatorLog
         }
     }
 
-    private boolean othersWitnessed(int offset, int exceptHostId)
+    private boolean othersWitnessed(int offset, int exceptNodeId)
     {
         for (int i = 0; i < participants.size(); ++i)
         {
-            int hostId = participants.get(i);
-            if (hostId != exceptHostId && !get(hostId).contains(offset))
+            int nodeId = participants.get(i);
+            if (nodeId != exceptNodeId && 
!witnessedOffsets.get(nodeId).contains(offset))
                 return false;
         }
         return true;
@@ -173,7 +228,7 @@ public abstract class CoordinatorLog
 
     protected boolean remoteReplicasWitnessed(int offset)
     {
-        return othersWitnessed(offset, localHostId);
+        return othersWitnessed(offset, localNodeId);
     }
 
     /**
@@ -220,7 +275,7 @@ public abstract class CoordinatorLog
         lock.readLock().lock();
         try
         {
-            into.add(Offsets.Immutable.difference(remoteOffsets, getLocal()));
+            into.add(Offsets.Immutable.difference(remoteOffsets, 
witnessedOffsets.get(localNodeId)));
         }
         finally
         {
@@ -233,9 +288,11 @@ public abstract class CoordinatorLog
         lock.readLock().lock();
         try
         {
-            remoteNodeIds.forEachInt(remoteNodeId -> {
-                Offsets missing = 
Offsets.Immutable.difference(get(remoteNodeId), localOffsets);
-                if (!missing.isEmpty()) into.add(remoteNodeId, missing);
+            remoteNodeIds.forEachInt(remoteNodeId ->
+            {
+                Offsets missing = 
Offsets.Immutable.difference(witnessedOffsets.get(remoteNodeId), localOffsets);
+                if (!missing.isEmpty())
+                    into.add(remoteNodeId, missing);
             });
         }
         finally
@@ -244,18 +301,12 @@ public abstract class CoordinatorLog
         }
     }
 
-    protected Offsets.Mutable get(int hostId)
-    {
-        return witnessedOffsets[participants.indexOf(hostId)];
-    }
-
     boolean isDurablyReconciled(CoordinatorLogOffsets<?> logOffsets)
     {
         lock.readLock().lock();
         try
         {
-            // TODO: reconciledOffsets not necessarily durable, update once 
durability is implemented
-            Offsets.RangeIterator durablyReconciled = 
reconciledOffsets.rangeIterator();
+            Offsets.RangeIterator durablyReconciled = 
reconciledPersistedOffsets.rangeIterator();
             Offsets.RangeIterator difference = 
Offsets.difference(logOffsets.offsets(logId.asLong()).rangeIterator(), 
durablyReconciled);
             return !difference.tryAdvance();
         }
@@ -265,17 +316,12 @@ public abstract class CoordinatorLog
         }
     }
 
-    protected Offsets.Mutable getLocal()
-    {
-        return witnessedOffsets[participants.indexOf(localHostId)];
-    }
-
     @Override
     public String toString()
     {
         return "CoordinatorLog{" +
                "logId=" + logId +
-               ", localHostId=" + localHostId +
+               ", localNodeId=" + localNodeId +
                ", participants=" + participants +
                '}';
     }
@@ -284,24 +330,31 @@ public abstract class CoordinatorLog
     {
         private final AtomicLong sequenceId = new AtomicLong(-1);
 
-        CoordinatorLogPrimary(int localHostId, CoordinatorLogId logId, 
Participants participants)
+        CoordinatorLogPrimary(
+            String keyspace, Range<Token> range, int localNodeId, 
CoordinatorLogId logId, Participants participants,
+            Node2OffsetsMap witnessedOffsets, Node2OffsetsMap 
persistedOffsets, UnreconciledMutations unreconciledMutations)
         {
-            super(localHostId, logId, participants);
+            super(keyspace, range, localNodeId, logId, participants, 
witnessedOffsets, persistedOffsets, unreconciledMutations);
+        }
+
+        CoordinatorLogPrimary(String keyspace, Range<Token> range, int 
localNodeId, CoordinatorLogId logId, Participants participants)
+        {
+            super(keyspace, range, localNodeId, logId, participants);
         }
 
         @Override
-        void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+        void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
         {
             Preconditions.checkArgument(!mutationId.isNone());
-            Preconditions.checkArgument(!Objects.equals(fromHostId, 
ClusterMetadata.current().myNodeId().id()));
-            logger.trace("witnessed remote mutation {} from {}", mutationId, 
fromHostId);
+            Preconditions.checkArgument(!Objects.equals(fromNodeId, 
ClusterMetadata.current().myNodeId().id()));
+            logger.trace("witnessed remote mutation {} from {}", mutationId, 
fromNodeId);
             lock.writeLock().lock();
             try
             {
-                if (!get(fromHostId).add(mutationId.offset()))
+                if (!witnessedOffsets.get(fromNodeId).add(mutationId.offset()))
                     return; // already witnessed; very uncommon but possible 
path
 
-                if (!getLocal().contains(mutationId.offset()))
+                if 
(!witnessedOffsets.get(localNodeId).contains(mutationId.offset()))
                     return; // local host hasn't witnessed yet -> no cleanup 
needed
 
                 if (remoteReplicasWitnessed(mutationId.offset()))
@@ -318,9 +371,13 @@ public abstract class CoordinatorLog
             }
         }
 
+        @Nullable
         MutationId nextId()
         {
-            return new MutationId(logId.asLong(), nextSequenceId());
+            long nextSequenceId = nextSequenceId();
+            return nextSequenceId >= 0
+                 ? new MutationId(logId.asLong(), nextSequenceId)
+                 : null;
         }
 
         private long nextSequenceId()
@@ -331,6 +388,10 @@ public abstract class CoordinatorLog
                 int prevOffset = MutationId.offset(prev);
                 int prevTimestamp = MutationId.timestamp(prev);
 
+                // int overflow
+                if (prevOffset == MAX_OFFSET)
+                    return -1;
+
                 int nextOffset = prevOffset + 1;
                 int nextTimestamp = Math.max(prevTimestamp + 1, (int) 
(currentTimeMillis() / 1000L));
                 long next = MutationId.sequenceId(nextOffset, nextTimestamp);
@@ -343,16 +404,131 @@ public abstract class CoordinatorLog
 
     static class CoordinatorLogReplica extends CoordinatorLog
     {
-        CoordinatorLogReplica(int localHostId, CoordinatorLogId logId, 
Participants participants)
+        CoordinatorLogReplica(
+            String keyspace, Range<Token> range, int localNodeId, 
CoordinatorLogId logId, Participants participants,
+            Node2OffsetsMap witnessedOffsets, Node2OffsetsMap 
persistedOffsets, UnreconciledMutations unreconciledMutations)
         {
-            super(localHostId, logId, participants);
+            super(keyspace, range, localNodeId, logId, participants, 
witnessedOffsets, persistedOffsets, unreconciledMutations);
+        }
+
+        CoordinatorLogReplica(String keyspace, Range<Token> range, int 
localNodeId, CoordinatorLogId logId, Participants participants)
+        {
+            super(keyspace, range, localNodeId, logId, participants);
         }
 
         @Override
-        void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+        void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
         {
             // no-op
         }
     }
 
+    /*
+     * Persist to / load from system table.
+     */
+
+    private static final String INSERT_QUERY =
+        format("INSERT INTO %s.%s (keyspace_name, range_start, range_end, 
host_id, host_log_id, participants, witnessed_offsets, persisted_offsets) "
+               + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+               SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.COORDINATOR_LOGS);
+
+    void persistToSystemTable()
+    {
+        Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+        Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+        lock.readLock().lock();
+        try
+        {
+            witnessedOffsets.convertToPrimitiveMap(witnessed);
+            persistedOffsets.convertToPrimitiveMap(persisted);
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+        executeInternal(INSERT_QUERY, keyspace, range.left.toString(), 
range.right.toString(), logId.hostId,
+                        logId.hostLogId, participants.asSet(), witnessed, 
persisted);
+    }
+
+    void updateLogsInSystemTable()
+    {
+        Offsets.Mutable localWitnessed;
+        Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+        Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+        lock.readLock().lock();
+        try
+        {
+            localWitnessed = 
Offsets.Mutable.copy(witnessedOffsets.get(localNodeId));
+
+            witnessedOffsets.convertToPrimitiveMap(witnessed);
+            persistedOffsets.convertToPrimitiveMap(persisted);
+
+            persisted.put(localNodeId, witnessed.get(localNodeId));
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+
+        executeInternal(INSERT_QUERY, keyspace, range.left.toString(), 
range.right.toString(), logId.hostId,
+                        logId.hostLogId, participants.asSet(), witnessed, 
persisted);
+
+        lock.writeLock().lock();
+        try
+        {
+            persistedOffsets.set(localNodeId, localWitnessed);
+            reconciledPersistedOffsets.addAll(persistedOffsets.intersection());
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private static final String SELECT_QUERY =
+        format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND range_start = 
? AND range_end = ?",
+               SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.COORDINATOR_LOGS);
+
+    static List<CoordinatorLog> loadFromSystemTable(String keyspace, 
Range<Token> range, int localNodeId)
+    {
+        ArrayList<CoordinatorLog> logs = new ArrayList<>();
+        for (UntypedResultSet.Row row : executeInternal(SELECT_QUERY, 
keyspace, range.left.toString(), range.right.toString()))
+        {
+            int nodeId = row.getInt("host_id");
+            int hostLogId = row.getInt("host_log_id");
+            CoordinatorLogId logId = new CoordinatorLogId(nodeId, hostLogId);
+            Set<Integer> participants = row.getFrozenSet("participants", 
Int32Type.instance);
+            Map<Integer, List<Integer>> witnessedOffsets =
+                row.getMap("witnessed_offsets", Int32Type.instance, 
ListType.getInstance(Int32Type.instance, false));
+            Map<Integer, List<Integer>> persistedOffsets =
+                row.getMap("persisted_offsets", Int32Type.instance, 
ListType.getInstance(Int32Type.instance, false));
+            Node2OffsetsMap witnessed = fromPrimitiveMap(logId, 
witnessedOffsets);
+            Node2OffsetsMap persisted = fromPrimitiveMap(logId, 
persistedOffsets);
+            UnreconciledMutations unreconciled = 
UnreconciledMutations.loadFromJournal(witnessed, localNodeId);
+            CoordinatorLog log =
+                CoordinatorLog.recreate(keyspace, range, localNodeId, logId, 
new Participants(participants), witnessed, persisted, unreconciled);
+            logs.add(log);
+        }
+        return logs;
+    }
+
+    private static final String DELETE_QUERY =
+        format("DELETE FROM %s.%s WHERE keyspace_name = ? AND range_start = ? 
AND range_end = ? AND host_id = ? AND host_log_id = ?",
+               SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.COORDINATOR_LOGS);
+
+    void deleteFromSystemTable()
+    {
+        executeInternal(DELETE_QUERY, keyspace, range.left.toString(), 
range.right.toString(), logId.hostId, logId.hostLogId);
+    }
+
+    @VisibleForTesting
+    static void overrideMaxOffsetForTesting(int nexMaxOffset)
+    {
+        MAX_OFFSET = nexMaxOffset;
+    }
+    // don't make volatile unless it genuinely is an issue for some test,
+    // otherwise it should be *fine* as is, and slight overkill to make 
volatile
+    private static int MAX_OFFSET = Integer.MAX_VALUE;
 }
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 7ec6758707..f780a35bfa 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.replication;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -26,10 +27,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -40,10 +40,11 @@ import com.google.common.base.Preconditions;
 import org.agrona.collections.IntArrayList;
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
-import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Splitter;
@@ -55,10 +56,13 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.reads.tracked.TrackedLocalReads;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.ReplicaGroups;
 import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
 import org.apache.cassandra.utils.FBUtilities;
@@ -66,14 +70,17 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.lang.String.format;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 // TODO (expected): persistence (handle restarts)
 // TODO (expected): handle topology changes
 public class MutationTrackingService
 {
+    private static final ScheduledExecutorPlus executor = 
executorFactory().scheduled("Mutation-Tracking-Service", NORMAL);
+
     /**
      * Split ranges into this many shards.
      *
@@ -89,6 +96,7 @@ public class MutationTrackingService
     private final ConcurrentHashMap<CoordinatorLogId, Shard> log2ShardMap = 
new ConcurrentHashMap<>();
 
     private final ReplicatedOffsetsBroadcaster offsetsBroadcaster = new 
ReplicatedOffsetsBroadcaster();
+    private final LogStatePersister offsetsPersister = new LogStatePersister();
     private final ActiveLogReconciler activeReconciler = new 
ActiveLogReconciler();
 
     private final IncomingMutations incomingMutations = new 
IncomingMutations();
@@ -104,13 +112,16 @@ public class MutationTrackingService
         if (started)
             return;
 
-        logger.info("Starting replication tracking service");
+        prevHostLogId = loadHostLogIdFromSystemTable();
+
+        logger.info("Starting mutation tracking service. Previous host log id: 
{}", prevHostLogId);
 
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
-            if (keyspace.useMutationTracking())
-                keyspaceShards.put(keyspace.name, 
KeyspaceShards.make(keyspace, metadata, this::nextHostLogId, this::onNewLog));
+        if (metadata.myNodeId() != null)
+            for (KeyspaceShards ks : 
KeyspaceShards.loadFromSystemTables(metadata, this::nextLogId, this::onNewLog))
+                keyspaceShards.put(ks.keyspace, ks);
 
         offsetsBroadcaster.start();
+        offsetsPersister.start();
 
         ExpiredStatePurger.instance.register(incomingMutations);
 
@@ -124,9 +135,10 @@ public class MutationTrackingService
 
     public void shutdownBlocking() throws InterruptedException
     {
-        offsetsBroadcaster.shutdown();
-        offsetsBroadcaster.awaitTermination(1, TimeUnit.MINUTES);
+        // TODO: FIXME
         activeReconciler.shutdownBlocking();
+        executor.shutdown();
+        executor.awaitTermination(1, TimeUnit.MINUTES);
         ExpiredStatePurger.instance.shutdownBlocking();
     }
 
@@ -165,9 +177,9 @@ public class MutationTrackingService
         activeReconciler.schedule(mutationId, onHost, 
ActiveLogReconciler.Priority.REGULAR);
     }
 
-    public void updateReplicatedOffsets(String keyspace, Range<Token> range, 
List<? extends Offsets> offsets, InetAddressAndPort onHost)
+    public void updateReplicatedOffsets(String keyspace, Range<Token> range, 
List<? extends Offsets> offsets, boolean durable, InetAddressAndPort onHost)
     {
-        getOrCreateShards(keyspace).updateReplicatedOffsets(range, offsets, 
onHost);
+        getOrCreateShards(keyspace).updateReplicatedOffsets(range, offsets, 
durable, onHost);
     }
 
     public boolean startWriting(Mutation mutation)
@@ -264,41 +276,41 @@ public class MutationTrackingService
 
         ClusterMetadata csm = ClusterMetadata.current();
         KeyspaceMetadata ksm = csm.schema.getKeyspaceMetadata(keyspace);
-        return keyspaceShards.computeIfAbsent(keyspace, ignore -> 
KeyspaceShards.make(ksm, csm, this::nextHostLogId, this::onNewLog));
+        return keyspaceShards.computeIfAbsent(keyspace, ignore -> 
KeyspaceShards.make(ksm, csm, this::nextLogId, this::onNewLog));
+    }
+
+    private long nextLogId()
+    {
+        NodeId nodeId = ClusterMetadata.current().myNodeId();
+        Preconditions.checkNotNull(nodeId);
+        return CoordinatorLogId.asLong(nodeId.id(), nextHostLogId());
     }
 
-    // TODO (expected): durability
-    int nextHostLogId()
+    /*
+     * Allocate and persist the next host log id.
+     * We only do this on startup and when rotating logs.
+     */
+    private int nextHostLogId()
     {
-        return nextHostLogId.incrementAndGet();
+        int nextHostLogId = ++prevHostLogId;
+        persistHostLogIdToSystemTable(nextHostLogId);
+        return nextHostLogId;
     }
-    private final AtomicInteger nextHostLogId = new AtomicInteger();
+    private int prevHostLogId;
 
-    public boolean isDurablyReconciled(String keyspace, 
ImmutableCoordinatorLogOffsets logOffsets)
+    public boolean isDurablyReconciled(ImmutableCoordinatorLogOffsets 
logOffsets)
     {
         // Could pass through SSTable bounds to exclude shards for 
non-overlapping ranges, but this will mostly be
         // called on flush for L0 SSTables with wide bounds.
-
-        KeyspaceShards shards = keyspaceShards.get(keyspace);
-        if (shards == null)
-        {
-            logger.debug("Could not find shards for keyspace {}", keyspace);
-            return false;
-        }
-
         for (Long logId : logOffsets)
         {
-            CoordinatorLogId coordinatorLogId = new CoordinatorLogId(logId);
-            CoordinatorLog log = shards.logs.get(coordinatorLogId);
-            if (log == null)
-            {
-                logger.warn("Could not determine lifecycle for unknown logId 
{}, not marking as durably reconciled", coordinatorLogId);
-                return false;
-            }
-            if (!log.isDurablyReconciled(logOffsets))
+            Shard shard = getShardNullable(new CoordinatorLogId(logId));
+            if (shard == null)
+                throw new IllegalStateException("Could not find shard for 
logId " + logId);
+
+            if (!shard.isDurablyReconciled(logId, logOffsets))
                 return false;
         }
-
         return true;
     }
 
@@ -308,17 +320,15 @@ public class MutationTrackingService
         log2ShardMap.put(log.logId, shard);
     }
 
-    private static class KeyspaceShards implements Shard.Subscriber
+    private static class KeyspaceShards
     {
         private final String keyspace;
         private final Map<Range<Token>, Shard> shards;
         private final ReplicaGroups groups;
-        private final BiConsumer<Shard, CoordinatorLog> onNewLog;
 
         private transient final Map<Range<PartitionPosition>, Shard> ppShards;
-        private transient final Map<CoordinatorLogId, CoordinatorLog> logs;
 
-        static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata 
cluster, IntSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
+        static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata 
cluster, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
         {
             
Preconditions.checkArgument(keyspace.params.replicationType.isTracked());
 
@@ -341,26 +351,23 @@ public class MutationTrackingService
 
                 for (Range<Token> tokenRange : ranges)
                 {
-                    shards.put(tokenRange, new Shard(keyspace.name, 
tokenRange, cluster.myNodeId().id(), participants, forRange.lastModified(), 
logIdProvider, onNewLog));
+                    shards.put(tokenRange, new Shard(cluster.myNodeId().id(), 
keyspace.name, tokenRange, participants, logIdProvider, onNewLog));
                     groups.put(tokenRange, forRange.map(original -> 
original.withRange(tokenRange)));
                 }
             });
-            return new KeyspaceShards(keyspace.name, shards, new 
ReplicaGroups(groups), onNewLog);
+            KeyspaceShards keyspaceShards = new KeyspaceShards(keyspace.name, 
shards, new ReplicaGroups(groups));
+            keyspaceShards.persistToSystemTables();
+            return keyspaceShards;
         }
 
-        KeyspaceShards(String keyspace, Map<Range<Token>, Shard> shards, 
ReplicaGroups groups, BiConsumer<Shard, CoordinatorLog> onNewLog)
+        KeyspaceShards(String keyspace, Map<Range<Token>, Shard> shards, 
ReplicaGroups groups)
         {
             this.keyspace = keyspace;
             this.shards = shards;
             this.groups = groups;
-            this.onNewLog = onNewLog;
 
-            this.logs = new HashMap<>();
             HashMap<Range<PartitionPosition>, Shard> ppShards = new 
HashMap<>();
-            shards.forEach((range, shard) -> {
-                ppShards.put(Range.makeRowRange(range), shard);
-                shard.addSubscriber(this);
-            });
+            shards.forEach((range, shard) -> 
ppShards.put(Range.makeRowRange(range), shard));
             this.ppShards = ppShards;
         }
 
@@ -369,9 +376,9 @@ public class MutationTrackingService
             return lookUp(token).nextId();
         }
 
-        void updateReplicatedOffsets(Range<Token> range, List<? extends 
Offsets> offsets, InetAddressAndPort onHost)
+        void updateReplicatedOffsets(Range<Token> range, List<? extends 
Offsets> offsets, boolean durable, InetAddressAndPort onHost)
         {
-            shards.get(range).updateReplicatedOffsets(offsets, onHost);
+            shards.get(range).updateReplicatedOffsets(offsets, durable, 
onHost);
         }
 
         boolean startWriting(Mutation mutation)
@@ -430,66 +437,105 @@ public class MutationTrackingService
             return shards.get(groups.forRange(token).range());
         }
 
-        @Override
-        public void onLogCreation(CoordinatorLog log)
+        void persistToSystemTables()
         {
-            logger.debug("Indexing created log {}", log);
-            logs.put(log.logId, log);
+            for (Shard shard : shards.values()) shard.persistToSystemTables();
         }
 
-        @Override
-        public void onSubscribe(CoordinatorLog currentLog)
+        static List<KeyspaceShards> loadFromSystemTables(ClusterMetadata 
cluster, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
         {
-            logger.debug("Indexing current log {}", currentLog);
-            logs.put(currentLog.logId, currentLog);
+            Map<String, Map<Range<Token>, Shard>> groupedShards = new 
HashMap<>();
+            for (Shard shard : 
Shard.loadFromSystemTables(cluster.myNodeId().id(), logIdProvider, onNewLog))
+                groupedShards.computeIfAbsent(shard.keyspace, k -> new 
HashMap<>()).put(shard.range, shard);
+            List<KeyspaceShards> keyspaceShards = new ArrayList<>();
+            for (Map.Entry<String, Map<Range<Token>, Shard>> entry : 
groupedShards.entrySet())
+            {
+                ReplicationParams params = 
cluster.schema.getKeyspaceMetadata(entry.getKey()).params.replication;
+                ReplicaGroups originalGroups = 
cluster.placements.get(params).writes; // prior to splitting
+
+                Map<Range<Token>, VersionedEndpoints.ForRange> splitGroups = 
new HashMap<>();
+                for (Range<Token> splitRange : entry.getValue().keySet())
+                    splitGroups.put(splitRange, 
originalGroups.matchRange(splitRange));
+
+                keyspaceShards.add(new KeyspaceShards(entry.getKey(), 
entry.getValue(), new ReplicaGroups(splitGroups)));
+            }
+            return keyspaceShards;
         }
     }
 
-    // TODO (later): a more intelligent heuristic for offsets included in 
broadcasts
-    private static class ReplicatedOffsetsBroadcaster implements Runnable, 
Shutdownable
+    private static final String HOST_LOG_ID_KEY = "local";
+
+    private static final String INSERT_QUERY =
+        format("INSERT INTO %s.%s (key, host_log_id) VALUES (?, ?)",
+               SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.HOST_LOG_ID);
+
+    static void persistHostLogIdToSystemTable(int hostLogId)
+    {
+        executeInternal(INSERT_QUERY, HOST_LOG_ID_KEY, hostLogId);
+    }
+
+    private static final String SELECT_QUERY =
+        format("SELECT * FROM %s.%s WHERE key = ?", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.HOST_LOG_ID);
+
+    static int loadHostLogIdFromSystemTable()
     {
-        private static final ScheduledExecutorPlus executor =
-            executorFactory().scheduled("Replicated-Offsets-Broadcaster", 
NORMAL);
+        UntypedResultSet rows = executeInternal(SELECT_QUERY, HOST_LOG_ID_KEY);
+        if (rows.isEmpty())
+            return 0;
+        return rows.one().getInt("host_log_id");
+    }
 
+    // TODO (later): a more intelligent heuristic for offsets included in 
broadcasts
+    private static class ReplicatedOffsetsBroadcaster
+    {
         // TODO (later): a more intelligent heuristic for scheduling broadcasts
-        private static final long BROADCAST_INTERVAL_MILLIS = 200;
+        private static final long TRANSIENT_BROADCAST_INTERVAL_MILLIS = 200;
+        private static final long DURABLE_BROADCAST_INTERVAL_MILLIS = 60_000;
 
         void start()
         {
-            executor.scheduleWithFixedDelay(this, BROADCAST_INTERVAL_MILLIS, 
BROADCAST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+            executor.scheduleWithFixedDelay(() -> run(false),
+                                            
TRANSIENT_BROADCAST_INTERVAL_MILLIS,
+                                            
TRANSIENT_BROADCAST_INTERVAL_MILLIS,
+                                            TimeUnit.MILLISECONDS);
+            executor.scheduleWithFixedDelay(() -> run(true),
+                                            DURABLE_BROADCAST_INTERVAL_MILLIS,
+                                            DURABLE_BROADCAST_INTERVAL_MILLIS,
+                                            TimeUnit.MILLISECONDS);
         }
 
-        @Override
-        public boolean isTerminated()
+        public void run(boolean durable)
         {
-            return executor.isTerminated();
+            MutationTrackingService.instance.forEachKeyspace(ks -> run(ks, 
durable));
         }
 
-        @Override
-        public void shutdown()
+        private void run(KeyspaceShards shards, boolean durable)
         {
-            executor.shutdown();
+            shards.forEachShard(sh -> run(sh, durable));
         }
 
-        @Override
-        public Object shutdownNow()
+        private void run(Shard shard, boolean durable)
         {
-            return executor.shutdownNow();
-        }
+            BroadcastLogOffsets replicatedOffsets = 
shard.collectReplicatedOffsets(durable);
+            if (replicatedOffsets.isEmpty())
+                return;
 
-        @Override
-        public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
-        {
-            return executor.awaitTermination(timeout, units);
+            Message<BroadcastLogOffsets> message = 
Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets);
+
+            for (InetAddressAndPort target : shard.remoteReplicas())
+                if (FailureDetector.instance.isAlive(target))
+                    MessagingService.instance().send(message, target);
         }
+    }
+
+    private static class LogStatePersister implements Runnable
+    {
+        // TODO (expected): consider a different interval
+        private static final long PERSIST_INTERVAL_MINUTES = 1;
 
-        public void shutdownBlocking() throws InterruptedException
+        void start()
         {
-            if (!executor.isTerminated())
-            {
-                executor.shutdown();
-                executor.awaitTermination(1, MINUTES);
-            }
+            executor.scheduleWithFixedDelay(this, PERSIST_INTERVAL_MINUTES, 
PERSIST_INTERVAL_MINUTES, TimeUnit.MINUTES);
         }
 
         @Override
@@ -505,22 +551,15 @@ public class MutationTrackingService
 
         private void run(Shard shard)
         {
-            BroadcastLogOffsets replicatedOffsets = 
shard.collectReplicatedOffsets();
-            if (replicatedOffsets.isEmpty())
-                return;
-
-            Message<BroadcastLogOffsets> message = 
Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets);
-
-            for (InetAddressAndPort target : shard.remoteReplicas())
-                if (FailureDetector.instance.isAlive(target))
-                    MessagingService.instance().send(message, target);
+            shard.updateLogsInSystemTable();
         }
     }
 
     @VisibleForTesting
     public void broadcastOffsetsForTesting()
     {
-        offsetsBroadcaster.run();
+        offsetsBroadcaster.run(false);
+        offsetsBroadcaster.run(true);
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java 
b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
index 12fa18c98a..1001d855cd 100644
--- a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
+++ b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java
@@ -17,6 +17,14 @@
  */
 package org.apache.cassandra.replication;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Preconditions;
+
 import org.agrona.collections.Int2ObjectHashMap;
 import org.agrona.collections.IntObjConsumer;
 
@@ -25,24 +33,93 @@ import org.agrona.collections.IntObjConsumer;
  */
 public class Node2OffsetsMap
 {
-    private final Int2ObjectHashMap<Offsets> map = new Int2ObjectHashMap<>();
+    private final Int2ObjectHashMap<Offsets.Mutable> offsetsMap;
+
+    public Node2OffsetsMap()
+    {
+        offsetsMap = new Int2ObjectHashMap<>(8, 0.65f, false);
+    }
+
+    public static Node2OffsetsMap forParticipants(CoordinatorLogId logId, 
Participants participants)
+    {
+        Node2OffsetsMap map = new Node2OffsetsMap();
+        for (int i = 0; i < participants.size(); i++)
+            map.set(participants.get(i), new Offsets.Mutable(logId));
+        return map;
+    }
+
+    void set(int node, Offsets.Mutable offsets)
+    {
+        offsetsMap.put(node, offsets);
+    }
+
+    @Nonnull
+    Offsets.Mutable get(int node)
+    {
+        return Preconditions.checkNotNull(offsetsMap.get(node));
+    }
+
+    Offsets.Mutable intersection()
+    {
+        Iterator<Offsets.Mutable> iter = offsetsMap.values().iterator();
+
+        Preconditions.checkArgument(iter.hasNext());
+        if (offsetsMap.size() == 1)
+            return Offsets.Mutable.copy(iter.next());
+
+        Offsets.Mutable intersection = iter.next();
+        while (iter.hasNext())
+            intersection = Offsets.Mutable.intersection(intersection, 
iter.next());
+        return intersection;
+    }
 
     public void add(int node, Offsets offsets)
     {
-        Offsets.Mutable current = (Offsets.Mutable) map.get(node);
+        Offsets.Mutable current = offsetsMap.get(node);
         if (current != null)
             current.addAll(offsets);
         else
-            map.put(node, Offsets.Mutable.copy(offsets));
+            offsetsMap.put(node, Offsets.Mutable.copy(offsets));
     }
 
-    public void forEach(IntObjConsumer<Offsets> consumer)
+    public void forEach(IntObjConsumer<Offsets.Mutable> consumer)
     {
-        map.forEachInt(consumer);
+        offsetsMap.forEachInt(consumer);
     }
 
     public void clear()
     {
-        map.clear();
+        offsetsMap.clear();
+    }
+
+    public int size()
+    {
+        return offsetsMap.size();
+    }
+
+    void convertToPrimitiveMap(Map<Integer, List<Integer>> into)
+    {
+        for (Int2ObjectHashMap<Offsets.Mutable>.EntryIterator iter = 
offsetsMap.entrySet().iterator(); iter.hasNext();)
+        {
+            iter.next();
+            into.put(iter.getIntKey(), iter.getValue().asList());
+        }
+    }
+
+    static Node2OffsetsMap fromPrimitiveMap(CoordinatorLogId logId, 
Map<Integer, List<Integer>> from)
+    {
+        Node2OffsetsMap map = new Node2OffsetsMap();
+        for (Map.Entry<Integer, List<Integer>> entry : from.entrySet())
+            map.set(entry.getKey(), Offsets.fromList(logId, entry.getValue()));
+        return map;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof Node2OffsetsMap))
+            return false;
+        Node2OffsetsMap that = (Node2OffsetsMap) o;
+        return this.offsetsMap.equals(that.offsetsMap);
     }
 }
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java 
b/src/java/org/apache/cassandra/replication/Offsets.java
index fc080452a7..dfb4f3f404 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
+import org.agrona.collections.IntArrayList;
 import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -34,6 +35,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 public abstract class Offsets implements Iterable<ShortMutationId>
 {
@@ -200,6 +202,19 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
             digest.updateWithInt(bounds[i]);
     }
 
+    List<Integer> asList()
+    {
+        return new IntArrayList(Arrays.copyOf(bounds, size), size, 
IntArrayList.DEFAULT_NULL_VALUE);
+    }
+
+    static Mutable fromList(CoordinatorLogId logId, List<Integer> list)
+    {
+        int[] bounds = new int[list.size()];
+        for (int i = 0; i < list.size(); i++)
+            bounds[i] = list.get(i);
+        return new Mutable(logId, bounds, bounds.length);
+    }
+
     static abstract class AbstractMutable<T extends AbstractMutable<T>> 
extends Offsets implements OffsetReciever
     {
         AbstractMutable(RangeIterator rangeIterator)
@@ -380,6 +395,11 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
             // the offset is before all existing ranges, in-between two, or 
after all exsiting ranges
         }
 
+        public void add(int... offsets)
+        {
+            for (int offset : offsets) add(offset);
+        }
+
         private enum AddAction
         {
             INSERT, MOVE, INCLUDE;
@@ -673,6 +693,19 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
         {
             return Offsets.intersection(a, b, adaptor);
         }
+
+        public static Mutable intersection(Offsets.Mutable... offsets)
+        {
+            Preconditions.checkArgument(offsets.length > 0);
+
+            if (offsets.length == 1)
+                return copy(offsets[0]);
+
+            Mutable intersection = offsets[0];
+            for (int i = 1; i < offsets.length; i++)
+                intersection = intersection(intersection, offsets[i]);
+            return intersection;
+        }
     }
 
     public static class Immutable extends Offsets
diff --git a/src/java/org/apache/cassandra/replication/Participants.java 
b/src/java/org/apache/cassandra/replication/Participants.java
index f245797586..89494f240c 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/Participants.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.replication;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Set;
+
+import org.agrona.collections.IntHashSet;
 
 public class Participants
 {
@@ -63,4 +66,21 @@ public class Participants
     {
         return Arrays.toString(hosts);
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof Participants))
+            return false;
+        Participants that = (Participants) o;
+        return Arrays.equals(this.hosts, that.hosts);
+    }
+
+    Set<Integer> asSet()
+    {
+        IntHashSet set = new IntHashSet(hosts.length);
+        for (int host : hosts)
+            set.add(host);
+        return set;
+    }
 }
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 6588bede09..6f198e686d 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -18,72 +18,100 @@
 package org.apache.cassandra.replication;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.function.BiConsumer;
-import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
 
 import javax.annotation.Nonnull;
 
 import com.google.common.base.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.jctools.maps.NonBlockingHashMapLong;
 
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
 public class Shard
 {
-    private final String keyspace;
-    private final Range<Token> tokenRange;
-    private final int localHostId;
-    private final Participants participants;
-    private final Epoch sinceEpoch;
-    private final BiConsumer<Shard, CoordinatorLog> onNewLog;
-    private final NonBlockingHashMapLong<CoordinatorLog> logs;
-    // TODO (expected): add support for log rotation
-    private final CoordinatorLog.CoordinatorLogPrimary currentLocalLog;
+    private static final Logger logger = LoggerFactory.getLogger(Shard.class);
 
-    private final List<Subscriber> subscribers = new ArrayList<>();
+    final int localNodeId;
+    final String keyspace;
+    final Range<Token> range;
+    final Participants participants;
 
-    public interface Subscriber
-    {
-        default void onLogCreation(CoordinatorLog log) {}
-        default void onSubscribe(CoordinatorLog currentLog) {}
-    }
+    private final LongSupplier logIdProvider;
+    private final BiConsumer<Shard, CoordinatorLog> onNewLog;
+    private final NonBlockingHashMapLong<CoordinatorLog> logs;
+    private volatile CoordinatorLogPrimary currentLocalLog;
 
-    Shard(String keyspace,
-          Range<Token> tokenRange,
-          int localHostId,
+    Shard(int localNodeId,
+          String keyspace,
+          Range<Token> range,
           Participants participants,
-          Epoch sinceEpoch,
-          IntSupplier logIdProvider,
+          List<CoordinatorLog> logs,
+          LongSupplier logIdProvider,
           BiConsumer<Shard, CoordinatorLog> onNewLog)
     {
-        Preconditions.checkArgument(participants.contains(localHostId));
+        Preconditions.checkArgument(participants.contains(localNodeId));
 
+        this.localNodeId = localNodeId;
         this.keyspace = keyspace;
-        this.tokenRange = tokenRange;
-        this.localHostId = localHostId;
+        this.range = range;
         this.participants = participants;
-        this.sinceEpoch = sinceEpoch;
+        this.logIdProvider = logIdProvider;
         this.logs = new NonBlockingHashMapLong<>();
         this.onNewLog = onNewLog;
-        this.currentLocalLog = startNewLog(localHostId, 
logIdProvider.getAsInt(), participants);
-        CoordinatorLogId logId = currentLocalLog.logId;
-        Preconditions.checkArgument(!logId.isNone());
-        logs.put(logId.asLong(), currentLocalLog);
+        for (CoordinatorLog log : logs)
+        {
+            this.logs.put(log.logId.asLong(), log);
+            onNewLog.accept(Shard.this, log);
+        }
+        this.currentLocalLog = createNewPrimaryLog();
+    }
+
+    Shard(int localNodeId, String keyspace, Range<Token> range, Participants 
participants, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog> 
onNewLog)
+    {
+        this(localNodeId, keyspace, range, participants, 
Collections.emptyList(), logIdProvider, onNewLog);
     }
 
     MutationId nextId()
     {
-        return currentLocalLog.nextId();
+        MutationId nextId = currentLocalLog.nextId();
+        if (nextId != null)
+            return nextId;
+        return maybeRotateLocalLogAndGetNextId();
+    }
+
+    // if ids overflow, we need to rotate the local log
+    synchronized private MutationId maybeRotateLocalLogAndGetNextId()
+    {
+        MutationId nextId = currentLocalLog.nextId();
+        if (nextId != null) // another thread got to rotate before us
+            return nextId;
+        CoordinatorLogId oldLogId = currentLocalLog.logId;
+        currentLocalLog = createNewPrimaryLog();
+        logger.info("Rotated primary log for {}/{} from {} to {}", keyspace, 
range, oldLogId, currentLocalLog.logId);
+        return nextId();
     }
 
     void receivedWriteResponse(ShortMutationId mutationId, InetAddressAndPort 
fromHost)
@@ -92,11 +120,11 @@ public class Shard
         getOrCreate(mutationId).receivedWriteResponse(mutationId, fromHostId);
     }
 
-    void updateReplicatedOffsets(List<? extends Offsets> offsets, 
InetAddressAndPort onHost)
+    void updateReplicatedOffsets(List<? extends Offsets> offsets, boolean 
durable, InetAddressAndPort onHost)
     {
         int onHostId = ClusterMetadata.current().directory.peerId(onHost).id();
         for (Offsets logOffsets : offsets)
-            
getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, onHostId);
+            
getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, durable, 
onHostId);
     }
 
     boolean startWriting(Mutation mutation)
@@ -143,39 +171,31 @@ public class Shard
         for (int i = 0, size = participants.size(); i < size; ++i)
         {
             int hostId = participants.get(i);
-            if (hostId != localHostId)
+            if (hostId != localNodeId)
                 replicas.add(ClusterMetadata.current().directory.endpoint(new 
NodeId(hostId)));
         }
         return replicas;
     }
 
+    boolean isDurablyReconciled(long logId, CoordinatorLogOffsets<?> 
logOffsets)
+    {
+        return logs.get(logId).isDurablyReconciled(logOffsets);
+    }
+
     /**
      * Collects replicated offsets for the logs owned by this coordinator on 
this shard.
      */
-    BroadcastLogOffsets collectReplicatedOffsets()
+    BroadcastLogOffsets collectReplicatedOffsets(boolean durable)
     {
         List<Offsets.Immutable> offsets = new ArrayList<>();
         for (CoordinatorLog log : logs.values())
         {
-            Offsets.Immutable logOffsets = log.collectReplicatedOffsets();
+            Offsets.Immutable logOffsets = 
log.collectReplicatedOffsets(durable);
             if (logOffsets != null)
                 offsets.add(logOffsets);
         }
 
-        return new BroadcastLogOffsets(keyspace, tokenRange, offsets);
-    }
-
-    /**
-     * Creates a new coordinator log for this host. Primarily on Shard init 
(node startup or topology change).
-     * Also on keyspace creation.
-     */
-    private CoordinatorLog.CoordinatorLogPrimary startNewLog(int localHostId, 
int hostLogId, Participants participants)
-    {
-        CoordinatorLogId logId = new CoordinatorLogId(localHostId, hostLogId);
-        CoordinatorLog.CoordinatorLogPrimary log =
-            new CoordinatorLog.CoordinatorLogPrimary(localHostId, logId, 
participants);
-        onNewLog.accept(this, log);
-        return log;
+        return new BroadcastLogOffsets(keyspace, range, offsets, durable);
     }
 
     private CoordinatorLog getOrCreate(Mutation mutation)
@@ -203,27 +223,63 @@ public class Shard
     private CoordinatorLog getOrCreate(long logId)
     {
         CoordinatorLog log = logs.get(logId);
-        if (log != null)
-            return log;
-        CoordinatorLog newLog = logs.computeIfAbsent(logId, ignore -> 
CoordinatorLog.create(localHostId, new CoordinatorLogId(logId), participants));
-        onNewLog.accept(this, newLog);
-        for (Subscriber subscriber : subscribers)
-            subscriber.onLogCreation(newLog);
-        return newLog;
+        return log != null ? log : createNewLog(logId);
     }
 
-    public void addSubscriber(Subscriber subscriber)
+    /**
+     * Creates a new coordinator log for this host. Primarily on Shard init 
(node startup or topology change) and on keyspace creation.
+     */
+    private CoordinatorLog createNewLog(long logId)
     {
-        subscriber.onSubscribe(currentLocalLog);
-        subscribers.add(subscriber);
+        CoordinatorLog next = CoordinatorLog.create(keyspace, range, 
localNodeId, new CoordinatorLogId(logId), participants);
+        CoordinatorLog prev = logs.putIfAbsent(logId, next);
+        if (null == prev) onNewLog.accept(this, next);
+        return null != prev ? prev : next;
     }
 
-    private CoordinatorLog create(long logId)
+    private CoordinatorLogPrimary createNewPrimaryLog()
     {
-        CoordinatorLog log = CoordinatorLog.create(localHostId, new 
CoordinatorLogId(logId), participants);
-        onNewLog.accept(this, log);
-        for (Subscriber subscriber : subscribers)
-            subscriber.onLogCreation(log);
-        return log;
+        return (CoordinatorLogPrimary) createNewLog(logIdProvider.getAsLong());
+    }
+
+    /*
+     * Persist to / load from system table.
+     */
+
+    private static final String INSERT_QUERY =
+        format("INSERT INTO %s.%s (keyspace_name, range_start, range_end, 
participants) VALUES (?, ?, ?, ?)",
+               SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.SHARDS);
+
+    void persistToSystemTables()
+    {
+        executeInternal(INSERT_QUERY, keyspace, range.left.toString(), 
range.right.toString(), participants.asSet());
+        for (CoordinatorLog log : logs.values())
+            log.persistToSystemTable();
+    }
+
+    void updateLogsInSystemTable()
+    {
+        for (CoordinatorLog log : logs.values())
+            log.updateLogsInSystemTable();
+    }
+
+    private static final String SELECT_QUERY =
+        format("SELECT * FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.SHARDS);
+
+    static ArrayList<Shard> loadFromSystemTables(int localNodeId, LongSupplier 
logIdProvider, BiConsumer<Shard, CoordinatorLog> onNewLog)
+    {
+        Token.TokenFactory factory = 
ClusterMetadata.current().partitioner.getTokenFactory();
+        ArrayList<Shard> shards = new ArrayList<>();
+        for (UntypedResultSet.Row row : executeInternal(SELECT_QUERY))
+        {
+            String keyspace = row.getString("keyspace_name");
+            String rangeStart = row.getString("range_start");
+            String rangeEnd = row.getString("range_end");
+            Range<Token> range = new Range<>(factory.fromString(rangeStart), 
factory.fromString(rangeEnd));
+            Set<Integer> participants = row.getFrozenSet("participants", 
Int32Type.instance);
+            List<CoordinatorLog> logs = 
CoordinatorLog.loadFromSystemTable(keyspace, range, localNodeId);
+            shards.add(new Shard(localNodeId, keyspace, range, new 
Participants(participants), logs, logIdProvider, onNewLog));
+        }
+        return shards;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java 
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index 5a6e9d1f55..63dff77349 100644
--- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
@@ -197,4 +198,41 @@ public class UnreconciledMutations
             return collect(statesSet.subSet(start, end), tableId, 
includePending, into);
         }
     }
+
+    @VisibleForTesting
+    boolean equalsForTesting(UnreconciledMutations other)
+    {
+        return this.statesMap.equals(other.statesMap) && 
this.statesSet.equals(other.statesSet);
+    }
+
+    @VisibleForTesting
+    void addDirectly(Mutation mutation)
+    {
+        Entry entry = Entry.create(mutation);
+        entry.visibility = Visibility.VISIBLE;
+        statesMap.put(entry.offset, entry);
+        statesSet.add(entry);
+    }
+
+    static UnreconciledMutations loadFromJournal(Node2OffsetsMap 
witnessedOffsets, int localNodeId)
+    {
+        UnreconciledMutations result = new UnreconciledMutations();
+
+        Offsets.Mutable witnessed = witnessedOffsets.get(localNodeId);
+        Offsets.Mutable reconciled = witnessedOffsets.intersection();
+
+        // difference between locally witnessed offsets and fully reconciled 
ones is all the ids
+        // that need to be loaded into UnreconciledMutations index
+        Offsets.RangeIterator iter = 
Offsets.difference(witnessed.rangeIterator(), reconciled.rangeIterator());
+        while (iter.tryAdvance())
+        {
+            for (int offset = iter.start(), end = iter.end(); offset <= end; 
offset++)
+            {
+                ShortMutationId id = new ShortMutationId(witnessed.logId, 
offset);
+                result.addDirectly(MutationJournal.instance.read(id));
+            }
+        }
+
+        return result;
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
index c4a4079225..db2a2be94d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.db.SystemKeyspace.SNAPSHOT_TABLE_NAME;
+import static org.apache.cassandra.db.SystemKeyspace.METADATA_SNAPSHOTS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -183,7 +183,7 @@ public class SystemKeyspaceStorageTest extends 
CoordinatorPathTestBase
 
     public static void deleteSnapshot(long epoch)
     {
-        String query = String.format("DELETE FROM %s.%s WHERE epoch = ?", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
+        String query = String.format("DELETE FROM %s.%s WHERE epoch = ?", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
         QueryProcessor.executeInternal(query, epoch);
     }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java 
b/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
index 8da29df678..f5789d01f5 100644
--- a/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SystemKeyspaceQueryTest.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.utils.TimeUUID;
 import static org.apache.cassandra.db.SystemKeyspace.BATCHES;
 import static org.apache.cassandra.db.SystemKeyspace.LOCAL;
 import static org.apache.cassandra.db.SystemKeyspace.METADATA_LOG;
-import static org.apache.cassandra.db.SystemKeyspace.SNAPSHOT_TABLE_NAME;
+import static org.apache.cassandra.db.SystemKeyspace.METADATA_SNAPSHOTS;
 import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
 
 public class SystemKeyspaceQueryTest extends CQLTester
@@ -44,7 +44,7 @@ public class SystemKeyspaceQueryTest extends CQLTester
         assertRowCountNet(executeNet(String.format("SELECT * FROM %s.%s WHERE 
epoch = 1",
                                                    SYSTEM_KEYSPACE_NAME, 
METADATA_LOG)), 0);
         assertRowCountNet(executeNet(String.format("SELECT * FROM %s.%s WHERE 
epoch = 1",
-                                                   SYSTEM_KEYSPACE_NAME, 
SNAPSHOT_TABLE_NAME)), 0);
+                                                   SYSTEM_KEYSPACE_NAME, 
METADATA_SNAPSHOTS)), 0);
         // system.batches table uses LocalPartitioner
         assertRowCountNet(executeNet(String.format("SELECT * FROM %s.%s WHERE 
id = %s",
                                                    SYSTEM_KEYSPACE_NAME, 
BATCHES,
diff --git 
a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java 
b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
index 91a2788ddf..07f4bc5eaf 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
@@ -280,7 +280,7 @@ public class CoordinatorLogOffsetsTest
             ImmutableCoordinatorLogOffsets logOffsets = new 
ImmutableCoordinatorLogOffsets.Builder()
                     .add(mutation.id())
                     .build();
-            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks, 
logOffsets)).isTrue();
+            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isTrue();
         }
 
         // Applied locally but not on remote replicas
@@ -292,7 +292,7 @@ public class CoordinatorLogOffsetsTest
             ImmutableCoordinatorLogOffsets logOffsets = new 
ImmutableCoordinatorLogOffsets.Builder()
                     .add(mutation.id())
                     .build();
-            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks, 
logOffsets)).isFalse();
+            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse();
         }
 
         // Applied on remote replicas but not locally
@@ -306,7 +306,7 @@ public class CoordinatorLogOffsetsTest
             ImmutableCoordinatorLogOffsets logOffsets = new 
ImmutableCoordinatorLogOffsets.Builder()
                     .add(mutation.id())
                     .build();
-            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks, 
logOffsets)).isFalse();
+            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse();
         }
 
         // If no replicas are aware of a log, it should be considered 
unreconciled out of caution
@@ -326,7 +326,7 @@ public class CoordinatorLogOffsetsTest
 
             ImmutableCoordinatorLogOffsets.Builder logOffsetsBuilder = new 
ImmutableCoordinatorLogOffsets.Builder();
             logOffsetsBuilder.add(fakeMutationId);
-            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(ks, 
logOffsetsBuilder.build())).isFalse();
+            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build())).isFalse();
         }
 
         MutationTrackingService.instance.shutdownBlocking();
diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java 
b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
index f89f1a521b..5122683b41 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -30,6 +29,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -38,11 +38,21 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
 public class CoordinatorLogTest
 {
     private static final int LOCAL_HOST_ID = 1;
-    private static final CoordinatorLogId LOG_ID = new 
CoordinatorLogId(LOCAL_HOST_ID, 1);
-    private static final Participants PARTICIPANTS = new 
Participants(List.of(LOCAL_HOST_ID, 2, 3));
+    private static final int REMOTE_HOST_ID_1 = 2;
+    private static final int REMOTE_HOST_ID_2 = 3;
+
+    private static final CoordinatorLogId LOCAL_LOG_ID = new 
CoordinatorLogId(LOCAL_HOST_ID, 1);
+    private static final CoordinatorLogId REPLICA_LOG_ID = new 
CoordinatorLogId(REMOTE_HOST_ID_1, 1);
+
+    private static final Participants PARTICIPANTS =
+        new Participants(List.of(LOCAL_HOST_ID, REMOTE_HOST_ID_1, 
REMOTE_HOST_ID_2));
 
     private static final String KEYSPACE = "cltks";
     private static final String TABLE = "cltt";
@@ -57,6 +67,7 @@ public class CoordinatorLogTest
                                                  .addClusteringColumn("ck", 
UTF8Type.instance)
                                                  .addRegularColumn("value", 
UTF8Type.instance)
                                                  .build());
+        MutationJournal.instance.start();
     }
 
     private static Token tk(String key)
@@ -66,7 +77,7 @@ public class CoordinatorLogTest
 
     private static Offsets toOffsets(MutationId... ids)
     {
-        Offsets.Mutable list = new Offsets.Mutable(LOG_ID);
+        Offsets.Mutable list = new Offsets.Mutable(LOCAL_LOG_ID);
         for (MutationId id : ids)
             list.add(id.offset());
         return list;
@@ -74,15 +85,15 @@ public class CoordinatorLogTest
 
     private static void assertUnreconciled(Token token, TableId tableId, 
CoordinatorLog log, boolean includePending, Offsets expectedReconciled, 
MutationId... expectedIds)
     {
-        Offsets.Mutable reconciled = new Offsets.Mutable(LOG_ID);
-        Offsets.Mutable unreconciled = new Offsets.Mutable(LOG_ID);
+        Offsets.Mutable reconciled = new Offsets.Mutable(LOCAL_LOG_ID);
+        Offsets.Mutable unreconciled = new Offsets.Mutable(LOCAL_LOG_ID);
         log.collectOffsetsFor(token, tableId, includePending, unreconciled, 
reconciled);
 
         for (MutationId mid : expectedIds)
-            Assert.assertTrue(unreconciled.contains(mid.offset()));
+            assertTrue(unreconciled.contains(mid.offset()));
 
-        Assert.assertEquals(toOffsets(expectedIds), unreconciled);
-        Assert.assertEquals(expectedReconciled, reconciled);
+        assertEquals(toOffsets(expectedIds), unreconciled);
+        assertEquals(expectedReconciled, reconciled);
     }
 
     @Test
@@ -91,24 +102,18 @@ public class CoordinatorLogTest
         Token tk = tk("key");
         TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, 
TABLE);
         TableId tableId = metadata.id;
-        CoordinatorLogPrimary log = new CoordinatorLogPrimary(LOCAL_HOST_ID, 
LOG_ID, PARTICIPANTS);
+        CoordinatorLogPrimary log = new CoordinatorLogPrimary(KEYSPACE, new 
Range<>(tk, tk), LOCAL_HOST_ID, LOCAL_LOG_ID, PARTICIPANTS);
         MutationId[] ids = new MutationId[] { log.nextId(), log.nextId(), 
log.nextId(), };
 
         List<Mutation> mutations = new ArrayList<>(ids.length);
         for (MutationId id : ids)
         {
-            Mutation mutation =
-                new RowUpdateBuilder(metadata, 0, "key")
-                .clustering("ck")
-                .add("value", "value")
-                .build()
-                .withMutationId(id);
-
+            Mutation mutation = createMutation(id);
             mutations.add(mutation);
             log.startWriting(mutation);
         }
 
-        Offsets.Mutable reconciled = new Offsets.Mutable(LOG_ID);
+        Offsets.Mutable reconciled = new Offsets.Mutable(LOCAL_LOG_ID);
         // we've only started writing, so the ids shouldn't appear without 
includePending being true
         assertUnreconciled(tk, tableId, log, false, reconciled);
         assertUnreconciled(tk, tableId, log, true, reconciled, ids);
@@ -126,4 +131,86 @@ public class CoordinatorLogTest
         reconciled.add(ids[0].offset());
         assertUnreconciled(tk, tableId, log, false, reconciled, ids[1], 
ids[2]);
     }
+
+    private Mutation createMutation(MutationId id)
+    {
+        TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, 
TABLE);
+        return new RowUpdateBuilder(metadata, 0, "key")
+            .clustering("ck")
+            .add("value", "value")
+            .build()
+            .withMutationId(id);
+    }
+
+    @Test
+    public void persistAndLoadPrimaryLogTest()
+    {
+        testPersistAndLoadRoundtrip(LOCAL_LOG_ID);
+    }
+
+    @Test
+    public void persistAndLoadReplicaLogTest()
+    {
+        testPersistAndLoadRoundtrip(REPLICA_LOG_ID);
+    }
+
+    private void testPersistAndLoadRoundtrip(CoordinatorLogId logId)
+    {
+        Range<Token> range = new Range<>(tk("a"), tk("b"));
+
+        Offsets.Mutable offsets1 = new Offsets.Mutable(logId);
+        offsets1.add(1, 2, 3, 4);
+        Offsets.Mutable offsets2 = new Offsets.Mutable(logId);
+        offsets2.add(2, 3, 4, 5);
+        Offsets.Mutable offsets3 = new Offsets.Mutable(logId);
+        offsets3.add(3, 4, 5, 6);
+
+        Node2OffsetsMap witnessed = new Node2OffsetsMap();
+        witnessed.set(LOCAL_HOST_ID, offsets1);
+        witnessed.set(REMOTE_HOST_ID_1, offsets2);
+        witnessed.set(REMOTE_HOST_ID_2, offsets3);
+
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        Mutation mutation1 = createMutation(new MutationId(logId.asLong(), 
MutationId.sequenceId(1, 0)));
+        Mutation mutation2 = createMutation(new MutationId(logId.asLong(), 
MutationId.sequenceId(2, 0)));
+        unreconciled.addDirectly(mutation1);
+        unreconciled.addDirectly(mutation2);
+        MutationJournal.instance.write(mutation1.id(), mutation1);
+        MutationJournal.instance.write(mutation2.id(), mutation2);
+
+        CoordinatorLog log =
+            CoordinatorLog.recreate(KEYSPACE, range, LOCAL_HOST_ID, logId, 
PARTICIPANTS, witnessed, witnessed, unreconciled);
+
+        Offsets.Mutable reconciled = new Offsets.Mutable(logId);
+        reconciled.add(3, 4);
+        assertEquals(reconciled, log.reconciledOffsets);
+
+        validatePersistAndLoadRoundtrip(log);
+        log.deleteFromSystemTable();
+    }
+
+    private static void validatePersistAndLoadRoundtrip(CoordinatorLog log)
+    {
+        log.persistToSystemTable();
+        List<CoordinatorLog> logs = 
CoordinatorLog.loadFromSystemTable(KEYSPACE, log.range, LOCAL_HOST_ID);
+        assertEquals(1, logs.size());
+        CoordinatorLog loaded = logs.get(0);
+
+        assertSame(log.getClass(), loaded.getClass());
+        assertEquals(log.keyspace, loaded.keyspace);
+        assertEquals(log.range, loaded.range);
+        assertEquals(log.logId, loaded.logId);
+        assertEquals(log.participants, loaded.participants);
+        assertEquals(log.localNodeId, loaded.localNodeId);
+
+        assertEquals(log.participants.size(), log.witnessedOffsets.size());
+        assertEquals(log.participants.size(), log.persistedOffsets.size());
+        assertEquals(loaded.participants.size(), 
loaded.witnessedOffsets.size());
+        assertEquals(loaded.participants.size(), 
loaded.persistedOffsets.size());
+        assertEquals(log.witnessedOffsets, loaded.witnessedOffsets);
+        assertEquals(log.persistedOffsets, loaded.persistedOffsets);
+        assertEquals(log.reconciledOffsets, loaded.reconciledOffsets);
+
+        
assertTrue(log.unreconciledMutations.equalsForTesting(loaded.unreconciledMutations));
+    }
 }
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java 
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index 61c0a9ba5f..b0d5edda0f 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -843,21 +843,27 @@ public class OffsetsTest
     public void testRemoveWithExactSizedArray()
     {
         {
-            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] 
{10, 11}, 2);
+            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]{ 
10, 11 }, 2);
             offsets.remove(10);
             assertOffsetsEqual(offsets(11, 11), offsets);
         }
 
         {
-            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] 
{10, 11}, 2);
+            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]{ 
10, 11 }, 2);
             offsets.remove(11);
             assertOffsetsEqual(offsets(10, 10), offsets);
         }
 
         {
-            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] 
{11, 11}, 2);
+            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[]{ 
11, 11 }, 2);
             offsets.remove(11);
             assertOffsetsEqual(offsets(), offsets);
         }
     }
+
+    public void asListFromListRoundTripTest()
+    {
+        for (Offsets.Mutable offsets : new Offsets.Mutable[] { offsets(), 
offsets(1, 2), offsets(1, 3, 7, 9) })
+            assertOffsetsEqual(offsets, Offsets.fromList(LOG_ID, 
offsets.asList()));
+    }
 }
diff --git a/test/unit/org/apache/cassandra/replication/ShardTest.java 
b/test/unit/org/apache/cassandra/replication/ShardTest.java
new file mode 100644
index 0000000000..d8c62554f1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/ShardTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.LongSupplier;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.agrona.collections.MutableInteger;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShardTest
+{
+    private static final int LOCAL_HOST_ID = 1;
+    private static final int REMOTE_HOST_ID_1 = 2;
+    private static final int REMOTE_HOST_ID_2 = 3;
+
+    private static final String KEYSPACE = "shard_test_ks";
+    private static final String TABLE = "shard_test_table";
+
+    @BeforeClass
+    public static void setUp() throws IOException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3),
+                                    TableMetadata.builder(KEYSPACE, TABLE)
+                                                 .addPartitionKeyColumn("pk", 
UTF8Type.instance)
+                                                 .addClusteringColumn("ck", 
UTF8Type.instance)
+                                                 .addRegularColumn("value", 
UTF8Type.instance)
+                                                 .build());
+        MutationJournal.instance.start();
+    }
+
+    private static Token tk(String key)
+    {
+        return new 
ByteOrderedPartitioner.BytesToken(ByteBufferUtil.bytes(key));
+    }
+
+    @Test
+    public void testPersistAndLoadSingleShard()
+    {
+        Range<Token> range = new Range<>(tk("a"), tk("z"));
+        Participants participants = new Participants(List.of(LOCAL_HOST_ID, 
REMOTE_HOST_ID_1, REMOTE_HOST_ID_2));
+        MutableInteger logId = new MutableInteger();
+        LongSupplier logIdProvider = () -> 
CoordinatorLogId.asLong(LOCAL_HOST_ID, logId.getAndIncrement());
+
+        Shard original = new Shard(LOCAL_HOST_ID, KEYSPACE, range, 
participants, logIdProvider, (s, l) -> {});
+        original.persistToSystemTables();
+
+        ArrayList<Shard> loadedShards = 
Shard.loadFromSystemTables(LOCAL_HOST_ID, logIdProvider, (s, l) -> {});
+        assertEquals(1, loadedShards.size());
+        Shard loaded = loadedShards.get(0);
+
+        assertEquals(original.localNodeId, loaded.localNodeId);
+        assertEquals(original.keyspace, loaded.keyspace);
+        assertEquals(original.range, loaded.range);
+        assertEquals(original.participants, loaded.participants);
+        // TODO: compare the coordinator logs
+    }
+
+    @Test
+    public void testLogRotation()
+    {
+        CoordinatorLog.overrideMaxOffsetForTesting(100);
+        try
+        {
+            Range<Token> range = new Range<>(tk("a"), tk("z"));
+            Participants participants = new 
Participants(List.of(LOCAL_HOST_ID, REMOTE_HOST_ID_1, REMOTE_HOST_ID_2));
+            MutableInteger logId = new MutableInteger();
+            LongSupplier logIdProvider = () -> 
CoordinatorLogId.asLong(LOCAL_HOST_ID, logId.getAndIncrement());
+            Shard shard = new Shard(LOCAL_HOST_ID, KEYSPACE, range, 
participants, logIdProvider, (s, l) -> {
+            });
+
+            MutationId firstId = shard.nextId();
+            for (int i = 0; i < 100; i++)
+                assertEquals(firstId.hostLogId, shard.nextId().hostLogId);
+            assertEquals(firstId.hostLogId + 1, shard.nextId().hostLogId);
+        }
+        finally
+        {
+            CoordinatorLog.overrideMaxOffsetForTesting(Integer.MAX_VALUE);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java 
b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
index dfa025a327..281c8da079 100644
--- a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
+++ b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
@@ -43,10 +43,13 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.junit.Assert.assertEquals;
+
 public class UnreconciledMutationsTest
 {
     private static final String KEYSPACE = "ks";
     private static final String TABLE = "tbl";
+    private static TableId TABLE_ID;
 
     @BeforeClass
     public static void setUp() throws IOException
@@ -57,6 +60,7 @@ public class UnreconciledMutationsTest
                                                  .addPartitionKeyColumn("k", 
Int32Type.instance)
                                                  .addRegularColumn("v", 
Int32Type.instance)
                                                  .build());
+        TABLE_ID = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
     }
 
     private static Token tokenFor(int key)
@@ -69,9 +73,7 @@ public class UnreconciledMutationsTest
         TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, 
TABLE);
         
         // Create a MutationId with logId 1L and sequenceId that produces the 
desired offset
-        long logId = 1L;
-        long sequenceId = ((long) offset << 32) | 1000; // offset in high 
bits, timestamp in low bits
-        MutationId mutationId = new MutationId(logId, sequenceId);
+        MutationId mutationId = new MutationId(CoordinatorLogId.asLong(1, 1), 
MutationId.sequenceId(offset, 0));
         
         Mutation mutation = new RowUpdateBuilder(metadata, 0, partitionKey)
                            .add("v", value)
@@ -90,8 +92,7 @@ public class UnreconciledMutationsTest
     public void testSingleTokenCollectionIsolation()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create mutations for different partition keys with different tokens
         Mutation mutation1 = createMutation(100, 1000, 1);
         Mutation mutation2 = createMutation(200, 2000, 2);
@@ -113,30 +114,30 @@ public class UnreconciledMutationsTest
         
         // Test single token collection for token1 - should ONLY return 
mutation1
         Offsets.Mutable offsets1 = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
-        boolean found1 = unreconciled.collect(token1, tableId, false, 
offsets1);
+        boolean found1 = unreconciled.collect(token1, TABLE_ID, false, 
offsets1);
         
         Assert.assertTrue("Should find mutations for token1", found1);
-        Assert.assertEquals("Should only contain 1 offset for token1", 1, 
offsets1.offsetCount());
+        assertEquals("Should only contain 1 offset for token1", 1, 
offsets1.offsetCount());
         Assert.assertTrue("Should contain offset 1 for mutation1", 
offsets1.contains(1));
         Assert.assertFalse("Should NOT contain offset 2 for mutation2", 
offsets1.contains(2));
         Assert.assertFalse("Should NOT contain offset 3 for mutation3", 
offsets1.contains(3));
         
         // Test single token collection for token2 - should ONLY return 
mutation2
         Offsets.Mutable offsets2 = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
-        boolean found2 = unreconciled.collect(token2, tableId, false, 
offsets2);
+        boolean found2 = unreconciled.collect(token2, TABLE_ID, false, 
offsets2);
         
         Assert.assertTrue("Should find mutations for token2", found2);
-        Assert.assertEquals("Should only contain 1 offset for token2", 1, 
offsets2.offsetCount());
+        assertEquals("Should only contain 1 offset for token2", 1, 
offsets2.offsetCount());
         Assert.assertTrue("Should contain offset 2 for mutation2", 
offsets2.contains(2));
         Assert.assertFalse("Should NOT contain offset 1 for mutation1", 
offsets2.contains(1));
         Assert.assertFalse("Should NOT contain offset 3 for mutation3", 
offsets2.contains(3));
         
         // Test single token collection for token3 - should ONLY return 
mutation3
         Offsets.Mutable offsets3 = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
-        boolean found3 = unreconciled.collect(token3, tableId, false, 
offsets3);
+        boolean found3 = unreconciled.collect(token3, TABLE_ID, false, 
offsets3);
         
         Assert.assertTrue("Should find mutations for token3", found3);
-        Assert.assertEquals("Should only contain 1 offset for token3", 1, 
offsets3.offsetCount());
+        assertEquals("Should only contain 1 offset for token3", 1, 
offsets3.offsetCount());
         Assert.assertTrue("Should contain offset 3 for mutation3", 
offsets3.contains(3));
         Assert.assertFalse("Should NOT contain offset 1 for mutation1", 
offsets3.contains(1));
         Assert.assertFalse("Should NOT contain offset 2 for mutation2", 
offsets3.contains(2));
@@ -146,11 +147,10 @@ public class UnreconciledMutationsTest
     public void testEmptyCollection()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
         Token token = tokenFor(100);
         
         Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
-        boolean found = unreconciled.collect(token, tableId, false, offsets);
+        boolean found = unreconciled.collect(token, TABLE_ID, false, offsets);
         
         Assert.assertFalse("Should not find any mutations in empty 
collection", found);
         Assert.assertTrue("Should have no offsets", offsets.isEmpty());
@@ -160,8 +160,7 @@ public class UnreconciledMutationsTest
     public void testPendingVsVisibleMutations()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         Mutation pendingMutation = createMutation(100, 1000, 1);
         Mutation visibleMutation = createMutation(100, 2000, 2);
         Token token = pendingMutation.key().getToken();
@@ -173,19 +172,19 @@ public class UnreconciledMutationsTest
         
         // Test without including pending - should only get visible mutation
         Offsets.Mutable visibleOnly = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundVisible = unreconciled.collect(token, tableId, false, 
visibleOnly);
+        boolean foundVisible = unreconciled.collect(token, TABLE_ID, false, 
visibleOnly);
         
         Assert.assertTrue("Should find visible mutations", foundVisible);
-        Assert.assertEquals("Should only have 1 visible mutation", 1, 
visibleOnly.offsetCount());
+        assertEquals("Should only have 1 visible mutation", 1, 
visibleOnly.offsetCount());
         Assert.assertTrue("Should contain visible mutation offset", 
visibleOnly.contains(2));
         Assert.assertFalse("Should NOT contain pending mutation offset", 
visibleOnly.contains(1));
         
         // Test including pending - should get both mutations
         Offsets.Mutable includingPending = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundAll = unreconciled.collect(token, tableId, true, 
includingPending);
+        boolean foundAll = unreconciled.collect(token, TABLE_ID, true, 
includingPending);
         
         Assert.assertTrue("Should find all mutations", foundAll);
-        Assert.assertEquals("Should have 2 mutations total", 2, 
includingPending.offsetCount());
+        assertEquals("Should have 2 mutations total", 2, 
includingPending.offsetCount());
         Assert.assertTrue("Should contain pending mutation offset", 
includingPending.contains(1));
         Assert.assertTrue("Should contain visible mutation offset", 
includingPending.contains(2));
     }
@@ -194,8 +193,7 @@ public class UnreconciledMutationsTest
     public void testMultipleMutationsSameToken()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create multiple mutations for the same partition key (same token)
         Mutation mutation1 = createMutation(100, 1000, 1);
         Mutation mutation2 = createMutation(100, 2000, 2);
@@ -211,10 +209,10 @@ public class UnreconciledMutationsTest
         unreconciled.finishWriting(mutation3);
         
         Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
-        boolean found = unreconciled.collect(token, tableId, false, offsets);
+        boolean found = unreconciled.collect(token, TABLE_ID, false, offsets);
         
         Assert.assertTrue("Should find mutations for token", found);
-        Assert.assertEquals("Should have all 3 mutations for same token", 3, 
offsets.offsetCount());
+        assertEquals("Should have all 3 mutations for same token", 3, 
offsets.offsetCount());
         Assert.assertTrue("Should contain offset 1", offsets.contains(1));
         Assert.assertTrue("Should contain offset 2", offsets.contains(2));
         Assert.assertTrue("Should contain offset 3", offsets.contains(3));
@@ -224,8 +222,7 @@ public class UnreconciledMutationsTest
     public void testTableIdFiltering()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create a fake different table ID
         TableId differentTableId = TableId.generate();
         
@@ -237,10 +234,10 @@ public class UnreconciledMutationsTest
         
         // Query with correct table ID - should find mutation
         Offsets.Mutable correctTable = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundCorrect = unreconciled.collect(token, tableId, false, 
correctTable);
+        boolean foundCorrect = unreconciled.collect(token, TABLE_ID, false, 
correctTable);
         
         Assert.assertTrue("Should find mutation for correct table", 
foundCorrect);
-        Assert.assertEquals("Should have 1 mutation", 1, 
correctTable.offsetCount());
+        assertEquals("Should have 1 mutation", 1, correctTable.offsetCount());
         
         // Query with different table ID - should find nothing
         Offsets.Mutable differentTable = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
@@ -254,8 +251,7 @@ public class UnreconciledMutationsTest
     public void testMutationRemoval()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         Mutation mutation = createMutation(100, 1000, 1);
         Token token = mutation.key().getToken();
         
@@ -264,16 +260,16 @@ public class UnreconciledMutationsTest
         
         // Verify mutation is present
         Offsets.Mutable beforeRemoval = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundBefore = unreconciled.collect(token, tableId, false, 
beforeRemoval);
+        boolean foundBefore = unreconciled.collect(token, TABLE_ID, false, 
beforeRemoval);
         Assert.assertTrue("Should find mutation before removal", foundBefore);
-        Assert.assertEquals("Should have 1 mutation before removal", 1, 
beforeRemoval.offsetCount());
+        assertEquals("Should have 1 mutation before removal", 1, 
beforeRemoval.offsetCount());
         
         // Remove the mutation
         unreconciled.remove(1);
         
         // Verify mutation is gone
         Offsets.Mutable afterRemoval = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundAfter = unreconciled.collect(token, tableId, false, 
afterRemoval);
+        boolean foundAfter = unreconciled.collect(token, TABLE_ID, false, 
afterRemoval);
         Assert.assertFalse("Should not find mutation after removal", 
foundAfter);
         Assert.assertTrue("Should have no mutations after removal", 
afterRemoval.isEmpty());
     }
@@ -282,8 +278,7 @@ public class UnreconciledMutationsTest
     public void testTokenRangeCollection()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create mutations with different tokens and sort them
         List<Integer> keys = List.of(100, 200, 300, 400, 500);
         List<Mutation> mutations = new ArrayList<>();
@@ -311,7 +306,7 @@ public class UnreconciledMutationsTest
             new Bounds<>(startKey, endKey);
         
         Offsets.Mutable rangeOffsets = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundRange = unreconciled.collect(bounds, tableId, false, 
rangeOffsets);
+        boolean foundRange = unreconciled.collect(bounds, TABLE_ID, false, 
rangeOffsets);
         
         Assert.assertTrue("Should find mutations in range", foundRange);
         // Should include mutations at positions 0, 1, and 2 (first through 
middle inclusive)
@@ -322,8 +317,7 @@ public class UnreconciledMutationsTest
     public void testSingleTokenRangeCollectionBug()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create mutations for different partition keys with different tokens
         Mutation mutation1 = createMutation(100, 1000, 1);
         Mutation mutation2 = createMutation(200, 2000, 2);
@@ -342,11 +336,11 @@ public class UnreconciledMutationsTest
         // Test single token range collection
         Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
         Bounds<PartitionPosition> bounds = new Bounds<>(mutation1.key(), 
mutation1.key());
-        boolean found = unreconciled.collect(bounds, tableId, false, offsets);
+        boolean found = unreconciled.collect(bounds, TABLE_ID, false, offsets);
         
         // This should only return mutation1
         Assert.assertTrue("Should find mutations for single token range", 
found);
-        Assert.assertEquals("Single token range should only return 1 
mutation", 1, offsets.offsetCount());
+        assertEquals("Single token range should only return 1 mutation", 1, 
offsets.offsetCount());
         Assert.assertTrue("Should contain mutation1 offset", 
offsets.contains(1));
 
         // other mutations should not be included
@@ -358,8 +352,7 @@ public class UnreconciledMutationsTest
     public void testFullRangeCollectionWithMinimumToken()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create mutations for different partition keys with different tokens
         Mutation mutation1 = createMutation(100, 1000, 1);
         Mutation mutation2 = createMutation(200, 2000, 2);
@@ -382,11 +375,11 @@ public class UnreconciledMutationsTest
 
         // Test full range collection - this SHOULD return ALL mutations
         Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
-        boolean found = unreconciled.collect(fullRange, tableId, false, 
offsets);
+        boolean found = unreconciled.collect(fullRange, TABLE_ID, false, 
offsets);
         
         // Full range should return ALL mutations
         Assert.assertTrue("Should find mutations for full range", found);
-        Assert.assertEquals("Full range should return ALL mutations", 3, 
offsets.offsetCount());
+        assertEquals("Full range should return ALL mutations", 3, 
offsets.offsetCount());
         Assert.assertTrue("Should contain mutation1 offset", 
offsets.contains(1));
         Assert.assertTrue("Should contain mutation2 offset", 
offsets.contains(2));
         Assert.assertTrue("Should contain mutation3 offset", 
offsets.contains(3));
@@ -396,8 +389,7 @@ public class UnreconciledMutationsTest
     public void testSingleTokenCollectionVsFullRange()
     {
         UnreconciledMutations unreconciled = new UnreconciledMutations();
-        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
-        
+
         // Create mutations with different tokens
         Mutation mutation1 = createMutation(100, 1000, 1);
         Mutation mutation2 = createMutation(200, 2000, 2);
@@ -415,10 +407,10 @@ public class UnreconciledMutationsTest
         
         // Single token collection should only return mutation1
         Offsets.Mutable singleToken = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
-        boolean foundSingle = unreconciled.collect(targetToken, tableId, 
false, singleToken);
+        boolean foundSingle = unreconciled.collect(targetToken, TABLE_ID, 
false, singleToken);
         
         Assert.assertTrue("Should find mutation for single token", 
foundSingle);
-        Assert.assertEquals("Single token should only return 1 mutation", 1, 
singleToken.offsetCount());
+        assertEquals("Single token should only return 1 mutation", 1, 
singleToken.offsetCount());
         Assert.assertTrue("Should contain mutation1 offset", 
singleToken.contains(1));
         
         // This is the CRITICAL test - single token collection must NOT return 
all mutations
@@ -426,4 +418,33 @@ public class UnreconciledMutationsTest
         Assert.assertFalse("Single token collection should NOT contain 
mutation2", singleToken.contains(2));
         Assert.assertFalse("Single token collection should NOT contain 
mutation3", singleToken.contains(3));
     }
+
+    @Test
+    public void testLoadFromJournal()
+    {
+        MutationJournal.instance.start();
+
+        CoordinatorLogId logId = new CoordinatorLogId(1, 1);
+
+        Offsets.Mutable offsets1 = new Offsets.Mutable(logId);
+        offsets1.add(1, 2, 3, 4, 5, 6, 7);
+
+        Offsets.Mutable offsets2 = new Offsets.Mutable(logId);
+        offsets2.add(1, 2, 3, 4, 5);
+
+        Node2OffsetsMap witnessed = new Node2OffsetsMap();
+        witnessed.set(1, offsets1);
+        witnessed.set(2, offsets2);
+
+        Mutation mutation6 = createMutation(6, 6, 6);
+        Mutation mutation7 = createMutation(7, 7, 7);
+        MutationJournal.instance.write(mutation6.id(), mutation6);
+        MutationJournal.instance.write(mutation7.id(), mutation7);
+
+        Offsets.Mutable loadedOffsets = new Offsets.Mutable(logId);
+        UnreconciledMutations unreconciled = 
UnreconciledMutations.loadFromJournal(witnessed, 1);
+        unreconciled.collect(mutation6.key().getToken(), TABLE_ID, false, 
loadedOffsets);
+        unreconciled.collect(mutation7.key().getToken(), TABLE_ID, false, 
loadedOffsets);
+        assertEquals(List.of(6, 7), loadedOffsets.asList());
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to