urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1171304080


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   One idea to this:
   What if the checkpointing started using the source offset mechanism of 
Connect to keep track of the last offset-sync used for a specific consumer 
group?
   MirrorCheckpointTask could start emitting a source offset record like
   (group, topic, partition) -> (offsetOfLastUsedOffsetSync)
   
   This would allow
   1. The OffsetSyncStore to read compaction-eligible offset syncs at startup, 
since it will be able to detect that (going with the example from @gharris1727) 
it cannot use offset-sync B, as the last saved offset of the offset-sync topic 
was C (which has a higher offset than B).
   2. To rewind consumer group offsets if the source topic was recreated 
(offset sync record offsets will keep increasing regardless of the reset in the 
upstream offset)
   
   I understand that in complexity, this is pretty much the same as reading 
back checkpoints, but in terms of implementation, it should be simpler (?). One 
drawback is that the deletion of the offset-syncs topic messes up the offset 
translation logic - but I guess with KIP-875 (or with the followup of that KIP 
containing the "reset" functionality) that won't be a problem, as the 
checkpoint source offsets can be cleared to restore a clean state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to