[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1176536050 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -57,52 +58,170 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); -assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); +assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); -assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); +assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate -assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); +assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); -assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); -assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); +assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); -assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +} +} + +@Test +public void testPastOffsetTranslation() { +try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { +long maxOffsetLag = 10; +int offset = 0; +for (; offset <= 1000; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +assertSparseSyncInvariant(store, tp); +} +store.start(); + +// After starting but before seeing new offsets, only the latest startup offset can be translated +assertSparseSync(store, 1000, -1); + +for (; offset <= 1; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +assertSparseSyncInvariant(store, tp); +} + +// After seeing new offsets, we still cannot translate earlier than the latest startup offset +// Invariant D: the last sync from the initial read-to-end is still stored +assertSparseSync(store, 1000, -1); + +// We can translate offsets between
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1176247653 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +180,141 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> +syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync) +); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs)); +} +return mutableSyncs; +} + +private String offsetArrayToString(OffsetSync[] syncs) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < SYNCS_PER_PARTITION; i++) { +if (i == 0 || syncs[i] != syncs[i - 1]) { +if (i != 0) { +stateString.append(","); +} +// Print only if the sync is interesting, a series of repeated syncs will be elided +stateString.append(syncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(syncs[i].downstreamOffset()); +} +} +stateString.append("]"); +return stateString.toString(); +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +// If every element of the store is the same, then it satisfies invariants B and C trivially. +for (int i = 0; i < SYNCS_PER_PARTITION; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { +clearSyncArray(syncs, offsetSync); +return; +} +OffsetSync replacement = offsetSync; +OffsetSync oldValue = syncs[0]; +// Invariant A is always violated once a new sync appears. +// Repair Invariant A: the latest sync must always be updated +syncs[0] = replacement; +for (int current = 1; current < SYNCS_PER_PARTITION; current++) { +int previous = current - 1; + +// Consider using oldValue instead of replacement, which allows us to keep more distinct values stored +// If oldValue is not recent, it should be expired from the store +boolean isRecent = invariantB(syncs[previous], oldValue, previous, current); +// Ensure that this value is sufficiently separated from the previous value +// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement) +boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current); +// Ensure that this value is sufficiently separated from the next value +// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next]) +int next = current + 1; +boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next); +// If this condition is false, oldValue will be expired from the store and lost forever. +if (isRecent && separatedFromPrevious && separatedFromNext) { +replacement = oldValue; +} + +// The replacement variable always contains a value which satisfies the invariants for this index. +assert invariantB(syncs[previous], replacement, previous, current); +assert invariantC(syncs[previous], replacement, previous, current); + +// Test if changes to the previous index affected the invariant for this index +if (invariantB(syncs[previous], syncs[current], previous, current)) { +
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1176205390 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +180,141 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> +syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync) +); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs)); +} +return mutableSyncs; +} + +private String offsetArrayToString(OffsetSync[] syncs) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < SYNCS_PER_PARTITION; i++) { +if (i == 0 || syncs[i] != syncs[i - 1]) { +if (i != 0) { +stateString.append(","); +} +// Print only if the sync is interesting, a series of repeated syncs will be elided +stateString.append(syncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(syncs[i].downstreamOffset()); +} +} +stateString.append("]"); +return stateString.toString(); +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +// If every element of the store is the same, then it satisfies invariants B and C trivially. +for (int i = 0; i < SYNCS_PER_PARTITION; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { +clearSyncArray(syncs, offsetSync); +return; +} +OffsetSync replacement = offsetSync; +OffsetSync oldValue = syncs[0]; +// Invariant A is always violated once a new sync appears. +// Repair Invariant A: the latest sync must always be updated +syncs[0] = replacement; +for (int current = 1; current < SYNCS_PER_PARTITION; current++) { +int previous = current - 1; + +// Consider using oldValue instead of replacement, which allows us to keep more distinct values stored +// If oldValue is not recent, it should be expired from the store +boolean isRecent = invariantB(syncs[previous], oldValue, previous, current); +// Ensure that this value is sufficiently separated from the previous value +// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement) +boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current); +// Ensure that this value is sufficiently separated from the next value +// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next]) +int next = current + 1; +boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next); +// If this condition is false, oldValue will be expired from the store and lost forever. +if (isRecent && separatedFromPrevious && separatedFromNext) { +replacement = oldValue; +} + +// The replacement variable always contains a value which satisfies the invariants for this index. +assert invariantB(syncs[previous], replacement, previous, current); +assert invariantC(syncs[previous], replacement, previous, current); + +// Test if changes to the previous index affected the invariant for this index +if (invariantB(syncs[previous], syncs[current], previous, current)) { +
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1176202013 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +180,141 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> +syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync) +); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs)); +} +return mutableSyncs; +} + +private String offsetArrayToString(OffsetSync[] syncs) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < SYNCS_PER_PARTITION; i++) { +if (i == 0 || syncs[i] != syncs[i - 1]) { +if (i != 0) { +stateString.append(","); +} +// Print only if the sync is interesting, a series of repeated syncs will be elided +stateString.append(syncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(syncs[i].downstreamOffset()); +} +} +stateString.append("]"); +return stateString.toString(); +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +// If every element of the store is the same, then it satisfies invariants B and C trivially. +for (int i = 0; i < SYNCS_PER_PARTITION; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { +clearSyncArray(syncs, offsetSync); +return; +} +OffsetSync replacement = offsetSync; +OffsetSync oldValue = syncs[0]; +// Invariant A is always violated once a new sync appears. +// Repair Invariant A: the latest sync must always be updated +syncs[0] = replacement; +for (int current = 1; current < SYNCS_PER_PARTITION; current++) { +int previous = current - 1; + +// Consider using oldValue instead of replacement, which allows us to keep more distinct values stored +// If oldValue is not recent, it should be expired from the store +boolean isRecent = invariantB(syncs[previous], oldValue, previous, current); +// Ensure that this value is sufficiently separated from the previous value +// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement) +boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current); +// Ensure that this value is sufficiently separated from the next value +// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next]) +int next = current + 1; +boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next); +// If this condition is false, oldValue will be expired from the store and lost forever. +if (isRecent && separatedFromPrevious && separatedFromNext) { +replacement = oldValue; +} + +// The replacement variable always contains a value which satisfies the invariants for this index. +assert invariantB(syncs[previous], replacement, previous, current); +assert invariantC(syncs[previous], replacement, previous, current); + +// Test if changes to the previous index affected the invariant for this index +if (invariantB(syncs[previous], syncs[current], previous, current)) { +
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan > Monotonicity is an optimization - it tries to minimize re-processing after a failover. (Please correct me if I'm wrong, but I don't really see any other user stories behind it.) Yes this is correct, but I think it misses some nuance. The overall feature (offset translation, checkpoints, syncing offsets to the consumer group) is an optimization: users could just replicate their data, and start reading the replica from the beginning. And relevant to your earlier simplicity argument: it would be simpler to not offer offset translation, but would require users to re-deliver more data. In the same way, monotonicity is an optimization to not rewind checkpoints when we have already translated better ones: We could choose to not offer monotonicity for the simplicity but (with this specific in-memory store implementation) that means that if you are N offsets behind the replication flow, your offset could fall backward by another N offsets, doubling the data re-delivery. I would also consider the user-story of someone monitoring their cluster, and noticing that despite consumers and replication moving forward, the checkpoints topic is moving _backwards_. Though no error has occurred, the result of the offset translation is getting worse. To someone unfamiliar with the internal workings of this algorithm, it looks like we're just generating arbitrary offsets. Actually we were already doing that before KAFKA-13659, and someone went through the effort to figure out why their offsets went backwards each time the connector restarted, and opened a ticket that multiple people voted on. I think you can make the argument that monotonicity is an optimization that we can drop for convenience, but that doesn't change the fact that users are expecting it without an explicit contract. > Being able to checkpoint old offsets after a restart is a feature, and probably a good one. If cluster restarts/rebalances are frequent enough, and some consumers are lagging behind consistently, they might never get their checkpoints translated, ever. I completely agree. I think that offset translation _must_ be extended to offer translation of very old
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan > Monotonicity is an optimization - it tries to minimize re-processing after a failover. (Please correct me if I'm wrong, but I don't really see any other user stories behind it.) Yes this is correct, but I think it misses some nuance. The overall feature (offset translation, checkpoints, syncing offsets to the consumer group) is an optimization: users could just replicate their data, and start reading the replica from the beginning. And relevant to your earlier simplicity argument: it would be simpler to not offer offset translation, but would require users to re-deliver more data. In the same way, monotonicity is an optimization to not rewind checkpoints when we have already translated better ones: We could choose to not offer monotonicity for the simplicity but (with this specific in-memory store implementation) that means that if you are N offsets behind the replication flow, your offset could fall backward by another N offsets, doubling the data re-delivery. I would also consider the user-story of someone monitoring their cluster, and noticing that despite consumers and replication moving forward, the checkpoints topic is moving _backwards_. Though no error has occurred, the result of the offset translation is getting worse. To someone unfamiliar with the internal workings of this algorithm, it looks like we're just generating arbitrary offsets. Actually we were already doing that before KAFKA-13659, and someone went through the effort to figure out why their offsets went backwards each time the connector restarted, and opened a ticket that multiple people voted on. I think you can make the argument that monotonicity is an optimization that we can drop for convenience, but I think users are expecting it based on the context of MM2 as a feature. > Being able to checkpoint old offsets after a restart is a feature, and probably a good one. If cluster restarts/rebalances are frequent enough, and some consumers are lagging behind consistently, they might never get their checkpoints translated, ever. I completely agree. I think that offset translation _must_ be extended to offer translation of very old offsets, and the
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan > Monotonicity is an optimization - it tries to minimize re-processing after a failover. (Please correct me if I'm wrong, but I don't really see any other user stories behind it.) Yes this is correct, but I think it misses some nuance. The overall feature (offset translation, checkpoints, syncing offsets to the consumer group) is an optimization: users could just replicate their data, and start reading the replica from the beginning. And relevant to your earlier simplicity argument: it would be simpler to not offer offset translation, but would require users to re-deliver more data. In the same way, monotonicity is an optimization to not rewind checkpoints when we have already translated better ones: We could choose to not offer monotonicity for the simplicity but (with this specific in-memory store implementation) that means that if you are N offsets behind the replication flow, your offset could fall backward by another N offsets, doubling the data re-delivery. I would also consider the user-story of someone monitoring their cluster, and noticing that despite consumers and replication moving forward, the checkpoints topic is moving _backwards_. Though no error has occurred, the result of the offset translation is getting worse. To someone unfamiliar with the internal workings of this algorithm, it looks like we're just generating arbitrary offsets. Actually we were already doing that before KAFKA-13659, and someone went through the effort to figure out why their offsets went backwards each time the connector restarted to open a ticket. Additionally, it increases the difference in semantics between using the raw checkpoints topic and synced consumer offsets, which I think is more likely to lead to confusion. > Being able to checkpoint old offsets after a restart is a feature, and probably a good one. If cluster restarts/rebalances are frequent enough, and some consumers are lagging behind consistently, they might never get their checkpoints translated, ever. I completely agree. I think that offset translation _must_ be extended to offer translation of very old offsets, and the solution in this PR is just part of that
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan > Monotonicity is an optimization - it tries to minimize re-processing after a failover. (Please correct me if I'm wrong, but I don't really see any other user stories behind it.) Yes this is correct, but I think it misses some nuance. The overall feature (offset translation, checkpoints, syncing offsets to the consumer group) is an optimization: users could just replicate their data, and start reading the replica from the beginning. And relevant to your earlier simplicity argument: it would be simpler to not offer offset translation, but would require users to re-deliver more data. In the same way, monotonicity is an optimization to not rewind checkpoints when we have already translated better ones: We could choose to not offer monotonicity for the simplicity but (with this specific in-memory store implementation) that means that if you are N offsets behind the replication flow, your offset could fall backward by another N offsets, doubling the data re-delivery. I would also consider the user-story of someone monitoring their cluster, and noticing that despite consumers and replication moving forward, the checkpoints topic is moving _backwards_. Though no error has occurred, the result of the offset translation is getting worse. To someone unfamiliar with the internal workings of this algorithm, it looks like we're just generating arbitrary offsets. Additionally, causes a significant difference in semantics between using the raw checkpoints topic and synced consumer offsets, which I think is more likely to lead to confusion. > Being able to checkpoint old offsets after a restart is a feature, and probably a good one. If cluster restarts/rebalances are frequent enough, and some consumers are lagging behind consistently, they might never get their checkpoints translated, ever. I completely agree. I think that offset translation _must_ be extended to offer translation of very old offsets, and the solution in this PR is just part of that overall solution. The limitation you pointed out is a very important one, as it's the core limitation that this PR is trying to solve. Before this PR, consumers lagging behind the latest
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1172772709 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan Before https://issues.apache.org/jira/browse/KAFKA-13659 and https://issues.apache.org/jira/browse/KAFKA-12468 this logic did not have monotonicity guarantees, but that does not mean that behavior was acceptable to users. These tickets were opened and voted on by multiple users before we took action. If we eliminate the in-memory deduplication and the forced read-to-end in this PR, we will re-introduce behavior that we have already fixed on trunk. I do not think that is acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1171513542 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); +} +return mutableSyncs; +} + +private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { +OffsetSync[] syncs = new OffsetSync[Long.SIZE]; +clearSyncArray(syncs, firstSync); +return syncs; +} + +private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +for (int i = 0; i < Long.SIZE; i++) { +syncs[i] = offsetSync; +} +} + +private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { +long upstreamOffset = offsetSync.upstreamOffset(); +// Old offsets are invalid, so overwrite them all. +if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan I hadn't considered using the source offsets, and this is worth discussing more in a follow-up, as I think the current implementation with its limitations is all that we can afford for the upcoming release. Compared to re-reading the checkpoints topic, the advantages of adding checkpoint source offsets are: 1. We can use the source offsets consumer in the framework 2. The framework will compact the data for us. The disadvantages are: 1. Deletion of the offset syncs loses monotonicity guarantees 2. In non-EOS mode we lose monotonicity guarantees (source offsets arrive after checkpoints are written) 3. New load to the global source offsets topic that was not there before, hosting duplicate data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1159169763 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -57,52 +57,96 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); -assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); +assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); -assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); +assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate -assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); +assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); -assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); -assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); +assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); -assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); } } + +@Test +public void testPastOffsetTranslation() { +try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { +long maxOffsetLag = 10; +int offset = 0; +for (; offset <= 1000; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +} +store.start(); + +// After starting but before seeing new offsets, only the latest startup offset can be translated +assertSparseSync(store, 1000, -1); + +for (; offset <= 2000; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +} + +// After seeing new offsets, we still cannot translate earlier than the latest startup offset +assertSparseSync(store, 1000, -1); + +// We can translate offsets between the latest startup offset and the latest offset with variable precision +// Older offsets are less precise and translation ends up farther apart +assertSparseSync(store,
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1159169763 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -57,52 +57,96 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); -assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); +assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); -assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); +assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate -assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); +assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); -assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); -assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); +assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); -assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); -assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); -assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); +assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); +assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); +assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); -assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); +assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); } } + +@Test +public void testPastOffsetTranslation() { +try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { +long maxOffsetLag = 10; +int offset = 0; +for (; offset <= 1000; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +} +store.start(); + +// After starting but before seeing new offsets, only the latest startup offset can be translated +assertSparseSync(store, 1000, -1); + +for (; offset <= 2000; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +} + +// After seeing new offsets, we still cannot translate earlier than the latest startup offset +assertSparseSync(store, 1000, -1); + +// We can translate offsets between the latest startup offset and the latest offset with variable precision +// Older offsets are less precise and translation ends up farther apart +assertSparseSync(store,
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1159083568 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -25,17 +25,39 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; -/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ +/** + * Used internally by MirrorMaker. Stores offset syncs and performs offset translation. + * A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation + * later in the topic, closer to the live end of the topic. + * This maintains the following invariants for each topic-partition in the in-memory sync storage: + * + * Invariant A: syncs[0] is the latest offset sync from the syncs topic + * Invariant B: For each i,j, i <= j: syncs[j].upstream <= syncs[i].upstream < syncs[j].upstream + 2^j Review Comment: I split this invariant into two parts: the upper bound and lower bound. I also made both of the bounds stronger to promote a more effective distribution of syncs. This means that the old logic of bit comparisons is gone now, and the invariants actually drive the algorithm directly, instead of the bit comparisons happening to satisfy the invariants but not actually having anything to do with them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication
gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1149859768 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +stateString.append(mutableSyncs[i].upstreamOffset()); +stateString.append(":"); +stateString.append(mutableSyncs[i].downstreamOffset()); +} +} +stateString.append("]"); +log.trace("New sync {} applied, new state is {}", offsetSync, stateString); Review Comment: I'll keep the if guard and the log in the method itself, but extract the stringifying to a new method. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. Review Comment: I'm not sure that this is a substantial improvement. Assuming a topic name of n bytes, It saves a 8*64 = 512 byte allocation and copy, in addition to the existing ~ 32 + 2n allocation that deserializing an offset sync takes. I'll make the comment less committal. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); -offsetSyncs.put(sourceTopicPartition, offsetSync); +offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); +offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); +} + +private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { +// Make a copy of the array before mutating it, so that readers do not see inconsistent data +// TODO: batch updates so that this copy can be performed less often for high-volume sync topics. +OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); +updateSyncArray(mutableSyncs, offsetSync); +if (log.isTraceEnabled()) { +StringBuilder stateString = new StringBuilder(); +stateString.append("["); +for (int i = 0; i < Long.SIZE; i++) { +if (i != 0) { +stateString.append(","); +} +if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { +// Print only if the sync is interesting, a series of repeated syncs will appear as , +