iamaleksey commented on code in PR #4360:
URL: https://github.com/apache/cassandra/pull/4360#discussion_r2344471098


##########
src/java/org/apache/cassandra/replication/CoordinatorLog.java:
##########
@@ -343,16 +402,119 @@ private long nextSequenceId()
 
     static class CoordinatorLogReplica extends CoordinatorLog
     {
-        CoordinatorLogReplica(int localHostId, CoordinatorLogId logId, 
Participants participants)
+        CoordinatorLogReplica(
+            String keyspace, Range<Token> range, int localNodeId, 
CoordinatorLogId logId, Participants participants,
+            Node2OffsetsMap witnessedOffsets, Node2OffsetsMap persistedOffsets)
         {
-            super(localHostId, logId, participants);
+            super(keyspace, range, localNodeId, logId, participants, 
witnessedOffsets, persistedOffsets);
+        }
+
+        CoordinatorLogReplica(String keyspace, Range<Token> range, int 
localNodeId, CoordinatorLogId logId, Participants participants)
+        {
+            super(keyspace, range, localNodeId, logId, participants);
         }
 
         @Override
-        void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+        void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
         {
             // no-op
         }
     }
 
+    /*
+     * Persist to / load from system table.
+     */
+
+    private static final String INSERT_QUERY =
+        format("INSERT INTO %s.%s (keyspace_name, range_start, range_end, 
host_id, host_log_id, participants, witnessed_offsets, persisted_offsets) "
+               + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+               SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.COORDINATOR_LOGS);
+
+    void persistToSystemTable()
+    {
+        Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+        Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+        lock.readLock().lock();
+        try
+        {
+            witnessedOffsets.convertToPrimitiveMap(witnessed);
+            persistedOffsets.convertToPrimitiveMap(persisted);
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+        executeInternal(INSERT_QUERY, keyspace, range.left.toString(), 
range.right.toString(), logId.hostId,
+                        logId.hostLogId, participants.asSet(), witnessed, 
persisted);
+    }
+
+    void updateLogsInSystemTable()
+    {
+        Offsets.Mutable localWitnessed;
+        Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+        Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+        lock.readLock().lock();
+        try
+        {
+            localWitnessed = 
Offsets.Mutable.copy(witnessedOffsets.get(localNodeId));
+
+            witnessedOffsets.convertToPrimitiveMap(witnessed);
+            persistedOffsets.convertToPrimitiveMap(persisted);

Review Comment:
   I'm not sure about this one. We must overwrite the entire value of each list 
in the system table because of the representation, but that shouldn't be a big 
deal - the size of each list should be roughly the same - the head, with not 
gaps, collapsed into one range, plus the slightly sparse tail. I don't see how 
to make it incremental in this context, or what it'd bring. I could easily be 
missing something though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to