[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-25 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176536050


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
 // Emit synced downstream offset without dead-reckoning
 store.sync(tp, 100, 200);
-assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
 // Translate exact offsets
 store.sync(tp, 150, 251);
-assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
 // Use old offset (5) prior to any sync -> can't translate
-assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
 // Downstream offsets reset
 store.sync(tp, 200, 10);
-assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
 // Upstream offsets reset
 store.sync(tp, 20, 20);
-assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
 }
 }
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
 try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
 // no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
 // read a sync during startup
 store.sync(tp, 100, 200);
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
 // After the store is started all offsets are visible
 store.start();
-assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
 }
 }
 
 @Test
 public void testNoTranslationIfNoOffsetSync() {
 try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
 store.start();
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+}
+}
+
+@Test
+public void testPastOffsetTranslation() {
+try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+long maxOffsetLag = 10;
+int offset = 0;
+for (; offset <= 1000; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+assertSparseSyncInvariant(store, tp);
+}
+store.start();
+
+// After starting but before seeing new offsets, only the latest 
startup offset can be translated
+assertSparseSync(store, 1000, -1);
+
+for (; offset <= 1; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+assertSparseSyncInvariant(store, tp);
+}
+
+// After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+// Invariant D: the last sync from the initial read-to-end is 
still stored
+assertSparseSync(store, 1000, -1);
+
+// We can translate offsets between 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-25 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176247653


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +180,141 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+);
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+}
+return mutableSyncs;
+}
+
+private String offsetArrayToString(OffsetSync[] syncs) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+if (i == 0 || syncs[i] != syncs[i - 1]) {
+if (i != 0) {
+stateString.append(",");
+}
+// Print only if the sync is interesting, a series of repeated 
syncs will be elided
+stateString.append(syncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(syncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+return stateString.toString();
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+// If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+clearSyncArray(syncs, offsetSync);
+return;
+}
+OffsetSync replacement = offsetSync;
+OffsetSync oldValue = syncs[0];
+// Invariant A is always violated once a new sync appears.
+// Repair Invariant A: the latest sync must always be updated
+syncs[0] = replacement;
+for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+int previous = current - 1;
+
+// Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+// If oldValue is not recent, it should be expired from the store
+boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+// Ensure that this value is sufficiently separated from the 
previous value
+// We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+// Ensure that this value is sufficiently separated from the next 
value
+// We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+int next = current + 1;
+boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+// If this condition is false, oldValue will be expired from the 
store and lost forever.
+if (isRecent && separatedFromPrevious && separatedFromNext) {
+replacement = oldValue;
+}
+
+// The replacement variable always contains a value which 
satisfies the invariants for this index.
+assert invariantB(syncs[previous], replacement, previous, current);
+assert invariantC(syncs[previous], replacement, previous, current);
+
+// Test if changes to the previous index affected the invariant 
for this index
+if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-25 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176205390


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +180,141 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+);
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+}
+return mutableSyncs;
+}
+
+private String offsetArrayToString(OffsetSync[] syncs) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+if (i == 0 || syncs[i] != syncs[i - 1]) {
+if (i != 0) {
+stateString.append(",");
+}
+// Print only if the sync is interesting, a series of repeated 
syncs will be elided
+stateString.append(syncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(syncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+return stateString.toString();
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+// If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+clearSyncArray(syncs, offsetSync);
+return;
+}
+OffsetSync replacement = offsetSync;
+OffsetSync oldValue = syncs[0];
+// Invariant A is always violated once a new sync appears.
+// Repair Invariant A: the latest sync must always be updated
+syncs[0] = replacement;
+for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+int previous = current - 1;
+
+// Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+// If oldValue is not recent, it should be expired from the store
+boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+// Ensure that this value is sufficiently separated from the 
previous value
+// We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+// Ensure that this value is sufficiently separated from the next 
value
+// We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+int next = current + 1;
+boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+// If this condition is false, oldValue will be expired from the 
store and lost forever.
+if (isRecent && separatedFromPrevious && separatedFromNext) {
+replacement = oldValue;
+}
+
+// The replacement variable always contains a value which 
satisfies the invariants for this index.
+assert invariantB(syncs[previous], replacement, previous, current);
+assert invariantC(syncs[previous], replacement, previous, current);
+
+// Test if changes to the previous index affected the invariant 
for this index
+if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-25 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176202013


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +180,141 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+syncs == null ? createInitialSyncs(offsetSync) : 
updateExistingSyncs(syncs, offsetSync)
+);
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: consider batching updates so that this copy can be performed 
less often for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+log.trace("New sync {} applied, new state is {}", offsetSync, 
offsetArrayToString(mutableSyncs));
+}
+return mutableSyncs;
+}
+
+private String offsetArrayToString(OffsetSync[] syncs) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+if (i == 0 || syncs[i] != syncs[i - 1]) {
+if (i != 0) {
+stateString.append(",");
+}
+// Print only if the sync is interesting, a series of repeated 
syncs will be elided
+stateString.append(syncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(syncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+return stateString.toString();
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+// If every element of the store is the same, then it satisfies 
invariants B and C trivially.
+for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+clearSyncArray(syncs, offsetSync);
+return;
+}
+OffsetSync replacement = offsetSync;
+OffsetSync oldValue = syncs[0];
+// Invariant A is always violated once a new sync appears.
+// Repair Invariant A: the latest sync must always be updated
+syncs[0] = replacement;
+for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+int previous = current - 1;
+
+// Consider using oldValue instead of replacement, which allows us 
to keep more distinct values stored
+// If oldValue is not recent, it should be expired from the store
+boolean isRecent = invariantB(syncs[previous], oldValue, previous, 
current);
+// Ensure that this value is sufficiently separated from the 
previous value
+// We prefer to keep more recent syncs of similar precision (i.e. 
the value in replacement)
+boolean separatedFromPrevious = invariantC(syncs[previous], 
oldValue, previous, current);
+// Ensure that this value is sufficiently separated from the next 
value
+// We prefer to keep existing syncs of lower precision (i.e. the 
value in syncs[next])
+int next = current + 1;
+boolean separatedFromNext = next >= SYNCS_PER_PARTITION || 
invariantC(oldValue, syncs[next], current, next);
+// If this condition is false, oldValue will be expired from the 
store and lost forever.
+if (isRecent && separatedFromPrevious && separatedFromNext) {
+replacement = oldValue;
+}
+
+// The replacement variable always contains a value which 
satisfies the invariants for this index.
+assert invariantB(syncs[previous], replacement, previous, current);
+assert invariantC(syncs[previous], replacement, previous, current);
+
+// Test if changes to the previous index affected the invariant 
for this index
+if (invariantB(syncs[previous], syncs[current], previous, 
current)) {
+

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted, and opened a ticket that multiple people 
voted on. I think you can make the argument that monotonicity is an 
optimization that we can drop for convenience, but that doesn't change the fact 
that users are expecting it without an explicit contract.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted, and opened a ticket that multiple people 
voted on. I think you can make the argument that monotonicity is an 
optimization that we can drop for convenience, but I think users are expecting 
it based on the context of MM2 as a feature.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted to open a ticket. Additionally, it increases 
the difference in semantics between using the raw checkpoints topic and synced 
consumer offsets, which I think is more likely to lead to confusion.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the solution in this PR is just part 
of that 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Additionally, causes a significant difference in semantics 
between using the raw checkpoints topic and synced consumer offsets, which I 
think is more likely to lead to confusion.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the solution in this PR is just part 
of that overall solution. The limitation you pointed out is a very important 
one, as it's the core limitation that this PR is trying to solve. Before this 
PR, consumers lagging behind the latest 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-20 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1172772709


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan Before https://issues.apache.org/jira/browse/KAFKA-13659 and 
https://issues.apache.org/jira/browse/KAFKA-12468 this logic did not have 
monotonicity guarantees, but that does not mean that behavior was acceptable to 
users. These tickets were opened and voted on by multiple users before we took 
action.
   
   If we eliminate the in-memory deduplication and the forced read-to-end in 
this PR, we will re-introduce behavior that we have already fixed on trunk. I 
do not think that is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-19 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1171513542


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan I hadn't considered using the source offsets, and this is worth 
discussing more in a follow-up, as I think the current implementation with its 
limitations is all that we can afford for the upcoming release.
   
   Compared to re-reading the checkpoints topic, the advantages of adding 
checkpoint source offsets are:
   1. We can use the source offsets consumer in the framework
   2. The framework will compact the data for us.
   
   The disadvantages are:
   1. Deletion of the offset syncs loses monotonicity guarantees
   2. In non-EOS mode we lose monotonicity guarantees (source offsets arrive 
after checkpoints are written)
   3. New load to the global source offsets topic that was not there before, 
hosting duplicate data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-05 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1159169763


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -57,52 +57,96 @@ public void testOffsetTranslation() {
 
 // Emit synced downstream offset without dead-reckoning
 store.sync(tp, 100, 200);
-assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
 // Translate exact offsets
 store.sync(tp, 150, 251);
-assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
 // Use old offset (5) prior to any sync -> can't translate
-assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
 // Downstream offsets reset
 store.sync(tp, 200, 10);
-assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
 // Upstream offsets reset
 store.sync(tp, 20, 20);
-assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
 }
 }
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
 try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
 // no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
 // read a sync during startup
 store.sync(tp, 100, 200);
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
 // After the store is started all offsets are visible
 store.start();
-assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
 }
 }
 
 @Test
 public void testNoTranslationIfNoOffsetSync() {
 try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
 store.start();
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
 }
 }
+
+@Test
+public void testPastOffsetTranslation() {
+try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+long maxOffsetLag = 10;
+int offset = 0;
+for (; offset <= 1000; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+}
+store.start();
+
+// After starting but before seeing new offsets, only the latest 
startup offset can be translated
+assertSparseSync(store, 1000, -1);
+
+for (; offset <= 2000; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+}
+
+// After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+assertSparseSync(store, 1000, -1);
+
+// We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+// Older offsets are less precise and translation ends up farther 
apart
+assertSparseSync(store, 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-05 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1159169763


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -57,52 +57,96 @@ public void testOffsetTranslation() {
 
 // Emit synced downstream offset without dead-reckoning
 store.sync(tp, 100, 200);
-assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
 // Translate exact offsets
 store.sync(tp, 150, 251);
-assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
 // Use old offset (5) prior to any sync -> can't translate
-assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
 // Downstream offsets reset
 store.sync(tp, 200, 10);
-assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
 // Upstream offsets reset
 store.sync(tp, 20, 20);
-assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
 }
 }
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
 try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
 // no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
 // read a sync during startup
 store.sync(tp, 100, 200);
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
 // After the store is started all offsets are visible
 store.start();
-assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
 }
 }
 
 @Test
 public void testNoTranslationIfNoOffsetSync() {
 try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
 store.start();
-assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
 }
 }
+
+@Test
+public void testPastOffsetTranslation() {
+try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+long maxOffsetLag = 10;
+int offset = 0;
+for (; offset <= 1000; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+}
+store.start();
+
+// After starting but before seeing new offsets, only the latest 
startup offset can be translated
+assertSparseSync(store, 1000, -1);
+
+for (; offset <= 2000; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+}
+
+// After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+assertSparseSync(store, 1000, -1);
+
+// We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+// Older offsets are less precise and translation ends up farther 
apart
+assertSparseSync(store, 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-05 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1159083568


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -25,17 +25,39 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * 
+ * Invariant A: syncs[0] is the latest offset sync from the syncs 
topic
+ * Invariant B: For each i,j, i <= j: syncs[j].upstream <= 
syncs[i].upstream < syncs[j].upstream + 2^j

Review Comment:
   I split this invariant into two parts: the upper bound and lower bound.
   I also made both of the bounds stronger to promote a more effective 
distribution of syncs.
   
   This means that the old logic of bit comparisons is gone now, and the 
invariants actually drive the algorithm directly, instead of the bit 
comparisons happening to satisfy the invariants but not actually having 
anything to do with them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-03-27 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1149859768


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);

Review Comment:
   I'll keep the if guard and the log in the method itself, but extract the 
stringifying to a new method.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.

Review Comment:
   I'm not sure that this is a substantial improvement. Assuming a topic name 
of n bytes, It saves a 8*64 = 512 byte allocation and copy, in addition to the 
existing ~ 32 + 2n allocation that deserializing an offset sync takes. I'll 
make the comment less committal.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+