gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1176247653
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +180,141 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> + syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync) + ); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs)); + } + return mutableSyncs; + } + + private String offsetArrayToString(OffsetSync[] syncs) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < SYNCS_PER_PARTITION; i++) { + if (i == 0 || syncs[i] != syncs[i - 1]) { + if (i != 0) { + stateString.append(","); + } + // Print only if the sync is interesting, a series of repeated syncs will be elided + stateString.append(syncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(syncs[i].downstreamOffset()); + } + } + stateString.append("]"); + return stateString.toString(); + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + // If every element of the store is the same, then it satisfies invariants B and C trivially. + for (int i = 0; i < SYNCS_PER_PARTITION; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + clearSyncArray(syncs, offsetSync); + return; + } + OffsetSync replacement = offsetSync; + OffsetSync oldValue = syncs[0]; + // Invariant A is always violated once a new sync appears. + // Repair Invariant A: the latest sync must always be updated + syncs[0] = replacement; + for (int current = 1; current < SYNCS_PER_PARTITION; current++) { + int previous = current - 1; + + // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored + // If oldValue is not recent, it should be expired from the store + boolean isRecent = invariantB(syncs[previous], oldValue, previous, current); + // Ensure that this value is sufficiently separated from the previous value + // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement) + boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current); + // Ensure that this value is sufficiently separated from the next value + // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next]) + int next = current + 1; + boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next); + // If this condition is false, oldValue will be expired from the store and lost forever. + if (isRecent && separatedFromPrevious && separatedFromNext) { + replacement = oldValue; + } + + // The replacement variable always contains a value which satisfies the invariants for this index. + assert invariantB(syncs[previous], replacement, previous, current); + assert invariantC(syncs[previous], replacement, previous, current); + + // Test if changes to the previous index affected the invariant for this index + if (invariantB(syncs[previous], syncs[current], previous, current)) { + // Invariant B holds for syncs[current]: it must also hold for all later values Review Comment: There's two levels of this: one is that because we're not changing the value of syncs[current], the tail of the array (current through the end) is the same as it was upon entering updateSyncArray, and the invariants already hold, because either createInitialSyncs, or a previous call to updateSyncArray has constructed it to do so. For example, say that we're in the first call to updateSyncArray after createInitialSyncs, and the tail of the array has the same contents as syncs[current]. Because syncs[current] is valid in position current, it will also be valid for all of the later indexes, since the invariants get more relaxed as `j` increases. The other reason is because of the property mentioned in the javadoc: > They can be checked locally (by comparing all adjacent indexes) but hold globally (for all pairs of any distance). This can be proved like this: * Let i < j < k * Assume `invariantB(i, j) && invariantB(j, k)` is true. * `syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i && syncs[j].upstream <= syncs[k].upstream + 2^k - 2^j`, by definition of invariant B * `syncs[i].upstream <= (syncs[k].upstream + 2^k - 2^j) + 2^j - 2^i` by substitution * `syncs[i].upstream <= syncs[k].upstream + 2^k - 2^i` by simplification. * `invariantB(i, k)` is true by the definition of invariantB This means that by checking `invariantB(i, i+1)` across the whole array, including `invariantB(previous, current)` in this call and `invariant(current, next)` in a previous call, lets us know that the whole array satisfies the invariants and has global structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org