gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176205390


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant 
for this index
+            if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+                // Invariant B holds for syncs[current]: it must also hold for 
all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too 
old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old 
value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, 
current);
+                assert invariantC(syncs[previous], syncs[current], previous, 
current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i, int 
j) {
+        long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));

Review Comment:
   On this side, overflowing the bound is different, because this is a lower 
bound. If the bound overflows, that means that no offset sync can satisfy the 
invariant, since the overflowed bound will always be higher than the 
non-overflowed iSync.upstreamOffset. I'll update this condition to compensate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to