gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176247653


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated 
syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+            // Ensure that this value is sufficiently separated from the 
previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next 
value
+            // We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the 
store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which 
satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant 
for this index
+            if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+                // Invariant B holds for syncs[current]: it must also hold for 
all later values

Review Comment:
   There's two levels of this: one is that because we're not changing the value 
of syncs[current], the tail of the array (current through the end) is the same 
as it was upon entering updateSyncArray, and the invariants already hold, 
because either createInitialSyncs, or a previous call to updateSyncArray has 
constructed it to do so. 
   
   For example, say that we're in the first call to updateSyncArray after 
createInitialSyncs, and the tail of the array has the same contents as 
syncs[current]. Because syncs[current] is valid in position current, it will 
also be valid for all of the later indexes, since the invariants get more 
relaxed as `j` increases.
   
   The other reason is because of the property mentioned in the javadoc:
   
   > They can be checked locally (by comparing all adjacent indexes) but hold 
globally (for all pairs of any distance).
   
   This can be proved like this: 
   * Let i < j < k
   * Assume `invariantB(i, j) && invariantB(j, k)` is true.
   * `syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i && syncs[j].upstream 
<= syncs[k].upstream + 2^k - 2^j`, by definition of invariant B
   * `syncs[i].upstream <= (syncs[k].upstream + 2^k - 2^j) + 2^j - 2^i` by 
substitution
   * `syncs[i].upstream <= syncs[k].upstream + 2^k - 2^i` by simplification.
   * `invariantB(i, k)` is true by the definition of invariantB
   
   This means that by checking `invariantB(i, i+1)` across the whole array, 
including `invariantB(previous, current)` in this call and `invariant(current, 
next)` in a previous call, lets us know that the whole array satisfies the 
invariants and has global structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to