gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1149859768


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);

Review Comment:
   I'll keep the if guard and the log in the method itself, but extract the 
stringifying to a new method.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.

Review Comment:
   I'm not sure that this is a substantial improvement. Assuming a topic name 
of n bytes, It saves a 8*64 = 512 byte allocation and copy, in addition to the 
existing ~ 32 + 2n allocation that deserializing an offset sync takes. I'll 
make the comment less committal.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        syncs[0] = offsetSync;
+        for (int i = 1; i < Long.SIZE; i++) {
+            OffsetSync oldValue = syncs[i];
+            long mask = Long.MAX_VALUE << i;
+            // If the old value is too stale: at least one of the 64-i high 
bits of the offset value have changed
+            // This produces buckets quantized at boundaries of powers of 2
+            // Syncs:     a                  b             c       d   e
+            // Bucket 0  |a|                |b|           |c|     |d| |e|      
             (size    1)
+            // Bucket 1 | a|               | b|           |c |    |d | e|      
             (size    2)
+            // Bucket 2 | a  |           |   b|         |  c |  |  d |   |     
             (size    4)
+            // Bucket 3 | a      |       |   b   |      |  c    |  d     |     
             (size    8)
+            // Bucket 4 | a              |   b          |  c             |     
             (size   16)
+            // Bucket 5 | a                             |  c                   
          |  (size   32)
+            // Bucket 6 | a                                                    
          |  (size   64)
+            // ...        a                                                    
             ...
+            // Bucket63   a                                                    
             (size 2^63)
+            // State after a: [a,,,,,,, ... ,a] (all buckets written)
+            // State after b: [b,,,,,a,, ... ,a] (buckets 0-4 written)
+            // State after c: [c,,,,,,a, ... ,a] (buckets 0-5 written, b 
expired completely)
+            // State after d: [d,,,,c,,a, ... ,a] (buckets 0-3 written)
+            // State after e: [e,,d,,c,,a, ... ,a] (buckets 0-1 written)

Review Comment:
   I need to re-address the invariants as they really aren't strong enough or 
have explanatory power. I think including them here would be a good idea. I 
also felt strange putting them up in the class javadoc, maybe they belong down 
here instead.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +57,96 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
         }
     }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 2000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 1030, 1000);
+            assertSparseSync(store, 1540, 1030);
+            assertSparseSync(store, 1800, 1540);
+            assertSparseSync(store, 1920, 1800);
+            assertSparseSync(store, 1990, 1920);
+            assertSparseSync(store, 2000, 1990);

Review Comment:
   > Where do the argument values for these calls come from?
   
   I agree that these numbers are quite opaque looking at them again. They are 
the next multiple of 10 after indexes which are very divisible by 2:
   ```
   1030 -> 1024 -> 2^10
   1540 -> 1536 -> 2^10 + 2^9
   1780 -> 1792 -> 2^10 + 2^9 + 2^8
   1920 -> 1920 -> 2^10 + 2^9 + 2^8 + 2^7
   1990 -> 1984 -> 2^10 + 2^9 + 2^8 + 2^7 + 2^6
   2000 -> 2000 -> 2^10 + 2^9 + 2^8 + 2^7 + 2^6 +     + 2^4
   ```
   They are multiples of 10 because of the maxOffsetLag = 10, which does not 
know anything about this scheme and instead only emits syncs every 10 offsets.
   
   Also note that the pattern skips 2^5 = 32 because 1984 + 32 = 2016 which is 
beyond the end of the test.
   
   > Separately but possibly related, would it help to have a method that 
ensures that the invariants for the syncs stored by the OffsetSyncStore hold?
   
   Thanks for the new assertions! i've added them to the test as you described.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,

Review Comment:
   Hmm, I'm on the fence. On one hand, if you eliminate the intervening commas, 
the log message becomes ambiguous as you don't know exactly what the state of 
the array is.
   Currently, you can reconstruct the array exactly from the log message.
   
   However, eliminating the commas does not eliminate any information which 
would make _translation_ ambiguous, since all of the repeated elements have no 
effect. And because the print includes the offsets, you can infer by bit-prefix 
which elements appear where.
   
   If there's ever a bug in the bit masking, the commas would help figure that 
out. But this log message isn't going to be used by people debugging the bit 
masking, it's going to be used by people debugging their offset translation. I 
think it is safe to optimize for those users instead of for myself :)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   I needed a way to guarantee that if the task translates some offset, that 
it's at least as far along in the topic as the previous checkpoint. For two 
checkpoints emitted in the same task lifetime, the in-memory state in 
MirrorCheckpointTask is good enough to provide this guarantee.
   
   If we restart the source task it will re-read the offset syncs topic and 
could potentially reconstruct a different in-memory state, both in the 
MirrorCheckpointTask and the OffsetSyncStore. By discarding all of the earlier 
syncs, we ensure that the earliest sync we have in-memory has a higher offset 
than every previous offset sync that the previous lifetime of the 
OffsetSyncStore could have had.
   
   Consider the following situation:
   1. An upstream consumer group is close to the replication flow, everything 
is in steady state. The offset syncs array contains syncs A (oldest), B, and C 
(newest)
   2. A checkpoint is emitted which uses a fine grained offset sync C.
   3. The upstream consumer group stops consuming.
   4. MirrorSourceTask emits offset sync D, and the MirrorCheckpointTask 
discards offset sync C because it is of similar precision.
   5. The OffsetSyncStore translates the consumer group relative to offset sync 
B, but the MirrorCheckpointTask drops this checkpoint because it remembers that 
it previously translated relative to C.
   6. The MirrorCheckpointTask restarts, and in-memory checkpoints are cleared 
and offsets are restored.
   7. The MirrorCheckpointTask now tries to perform a checkpoint.
   
   With the current `!readToEnd` condition, only offset sync D is in-memory, 
and translation is skipped because the consumer group is behind the latest 
restart.
   If we remove the `!readToEnd` condition, the OffsetSyncStore translates the 
consumer group relative to offset sync B, and the MirrorCheckpointTask emits 
the checkpoint because it does not remember the previous checkpoint emitted.
   Thus it emitted a checkpoint computed by C, and then B, which was 
non-monotonic.
   
   I mentioned in a comment on KAFKA-14666 that we would have to re-read the 
checkpoints topic for arbitrarily old translation, and this is why. It can be 
done, it just represented a new complexity that I wanted to avoid.
   



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,37 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * <ul>
+ *     <li>syncs[0] is the latest offset sync from the syncs topic</li>
+ *     <li>For each i,j, i <= j: syncs[j].upstream <= syncs[i].upstream < 
syncs[j].upstream + 2^j</li>
+ * </ul>
+ * <p>Offset translation uses the syncs[i] which most closely precedes the 
upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will 
be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will 
not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an 
initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are 
present for a topic-partition, or if relevant
+ * offset syncs for the topic were eligible for compaction at the time of the 
initial read-to-end.

Review Comment:
   > it's also possible that replication began from a later point than the 
consumer group is reading from, which is also a possible reason that 
translation may be unavailable. Oh, and I guess there's the case where 
replication began, but was unable to produce offset syncs for whatever reason
   
   Thanks for pointing that out, I missed those conditions.
   
   > does it cover the case where syncs for earlier offsets have been wiped due 
to compaction? If so, we may want to clarify that offset syncs were "lost due 
to compaction" instead of "eligible for compaction".
   
   It does cover the cases where syncs are lost to compaction, but eligible for 
compaction includes even more records. This is alluding to the `!readToEnd` 
condition that you pointed out in another comment.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to