gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1149859768
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); Review Comment: I'll keep the if guard and the log in the method itself, but extract the stringifying to a new method. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. Review Comment: I'm not sure that this is a substantial improvement. Assuming a topic name of n bytes, It saves a 8*64 = 512 byte allocation and copy, in addition to the existing ~ 32 + 2n allocation that deserializing an offset sync takes. I'll make the comment less committal. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + clearSyncArray(syncs, offsetSync); + return; + } + syncs[0] = offsetSync; + for (int i = 1; i < Long.SIZE; i++) { + OffsetSync oldValue = syncs[i]; + long mask = Long.MAX_VALUE << i; + // If the old value is too stale: at least one of the 64-i high bits of the offset value have changed + // This produces buckets quantized at boundaries of powers of 2 + // Syncs: a b c d e + // Bucket 0 |a| |b| |c| |d| |e| (size 1) + // Bucket 1 | a| | b| |c | |d | e| (size 2) + // Bucket 2 | a | | b| | c | | d | | (size 4) + // Bucket 3 | a | | b | | c | d | (size 8) + // Bucket 4 | a | b | c | (size 16) + // Bucket 5 | a | c | (size 32) + // Bucket 6 | a | (size 64) + // ... a ... + // Bucket63 a (size 2^63) + // State after a: [a,,,,,,, ... ,a] (all buckets written) + // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written) + // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b expired completely) + // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written) + // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written) Review Comment: I need to re-address the invariants as they really aren't strong enough or have explanatory power. I think including them here would be a good idea. I also felt strange putting them up in the class javadoc, maybe they belong down here instead. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -57,52 +57,96 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); - assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); - assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); - assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); + assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); } } + + @Test + public void testPastOffsetTranslation() { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + long maxOffsetLag = 10; + int offset = 0; + for (; offset <= 1000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + } + store.start(); + + // After starting but before seeing new offsets, only the latest startup offset can be translated + assertSparseSync(store, 1000, -1); + + for (; offset <= 2000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + } + + // After seeing new offsets, we still cannot translate earlier than the latest startup offset + assertSparseSync(store, 1000, -1); + + // We can translate offsets between the latest startup offset and the latest offset with variable precision + // Older offsets are less precise and translation ends up farther apart + assertSparseSync(store, 1030, 1000); + assertSparseSync(store, 1540, 1030); + assertSparseSync(store, 1800, 1540); + assertSparseSync(store, 1920, 1800); + assertSparseSync(store, 1990, 1920); + assertSparseSync(store, 2000, 1990); Review Comment: > Where do the argument values for these calls come from? I agree that these numbers are quite opaque looking at them again. They are the next multiple of 10 after indexes which are very divisible by 2: ``` 1030 -> 1024 -> 2^10 1540 -> 1536 -> 2^10 + 2^9 1780 -> 1792 -> 2^10 + 2^9 + 2^8 1920 -> 1920 -> 2^10 + 2^9 + 2^8 + 2^7 1990 -> 1984 -> 2^10 + 2^9 + 2^8 + 2^7 + 2^6 2000 -> 2000 -> 2^10 + 2^9 + 2^8 + 2^7 + 2^6 + + 2^4 ``` They are multiples of 10 because of the maxOffsetLag = 10, which does not know anything about this scheme and instead only emits syncs every 10 offsets. Also note that the pattern skips 2^5 = 32 because 1984 + 32 = 2016 which is beyond the end of the test. > Separately but possibly related, would it help to have a method that ensures that the invariants for the syncs stored by the OffsetSyncStore hold? Thanks for the new assertions! i've added them to the test as you described. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, Review Comment: Hmm, I'm on the fence. On one hand, if you eliminate the intervening commas, the log message becomes ambiguous as you don't know exactly what the state of the array is. Currently, you can reconstruct the array exactly from the log message. However, eliminating the commas does not eliminate any information which would make _translation_ ambiguous, since all of the repeated elements have no effect. And because the print includes the offsets, you can infer by bit-prefix which elements appear where. If there's ever a bug in the bit masking, the commas would help figure that out. But this log message isn't going to be used by people debugging the bit masking, it's going to be used by people debugging their offset translation. I think it is safe to optimize for those users instead of for myself :) ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: I needed a way to guarantee that if the task translates some offset, that it's at least as far along in the topic as the previous checkpoint. For two checkpoints emitted in the same task lifetime, the in-memory state in MirrorCheckpointTask is good enough to provide this guarantee. If we restart the source task it will re-read the offset syncs topic and could potentially reconstruct a different in-memory state, both in the MirrorCheckpointTask and the OffsetSyncStore. By discarding all of the earlier syncs, we ensure that the earliest sync we have in-memory has a higher offset than every previous offset sync that the previous lifetime of the OffsetSyncStore could have had. Consider the following situation: 1. An upstream consumer group is close to the replication flow, everything is in steady state. The offset syncs array contains syncs A (oldest), B, and C (newest) 2. A checkpoint is emitted which uses a fine grained offset sync C. 3. The upstream consumer group stops consuming. 4. MirrorSourceTask emits offset sync D, and the MirrorCheckpointTask discards offset sync C because it is of similar precision. 5. The OffsetSyncStore translates the consumer group relative to offset sync B, but the MirrorCheckpointTask drops this checkpoint because it remembers that it previously translated relative to C. 6. The MirrorCheckpointTask restarts, and in-memory checkpoints are cleared and offsets are restored. 7. The MirrorCheckpointTask now tries to perform a checkpoint. With the current `!readToEnd` condition, only offset sync D is in-memory, and translation is skipped because the consumer group is behind the latest restart. If we remove the `!readToEnd` condition, the OffsetSyncStore translates the consumer group relative to offset sync B, and the MirrorCheckpointTask emits the checkpoint because it does not remember the previous checkpoint emitted. Thus it emitted a checkpoint computed by C, and then B, which was non-monotonic. I mentioned in a comment on KAFKA-14666 that we would have to re-read the checkpoints topic for arbitrarily old translation, and this is why. It can be done, it just represented a new complexity that I wanted to avoid. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -25,17 +25,37 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; -/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ +/** + * Used internally by MirrorMaker. Stores offset syncs and performs offset translation. + * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation + * later in the topic, closer to the live end of the topic. + * This maintains the following invariants for each topic-partition in the in-memory sync storage: + * <ul> + * <li>syncs[0] is the latest offset sync from the syncs topic</li> + * <li>For each i,j, i <= j: syncs[j].upstream <= syncs[i].upstream < syncs[j].upstream + 2^j</li> + * </ul> + * <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset. + * For a fixed in-memory state, translation of variable upstream offsets will be monotonic. + * For variable in-memory state, translation of a fixed upstream offset will not be monotonic. + * <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic + * is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, or if relevant + * offset syncs for the topic were eligible for compaction at the time of the initial read-to-end. Review Comment: > it's also possible that replication began from a later point than the consumer group is reading from, which is also a possible reason that translation may be unavailable. Oh, and I guess there's the case where replication began, but was unable to produce offset syncs for whatever reason Thanks for pointing that out, I missed those conditions. > does it cover the case where syncs for earlier offsets have been wiped due to compaction? If so, we may want to clarify that offset syncs were "lost due to compaction" instead of "eligible for compaction". It does cover the cases where syncs are lost to compaction, but eligible for compaction includes even more records. This is alluding to the `!readToEnd` condition that you pointed out in another comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org