gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1176536050


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache

Review Comment:
   I think the reason why there are at most two syncs expired from the cache 
has to do with the overall structure of an update operation, which breaks the 
array into three logical parts:
   * Beginning to the first distinct sync (while replacement == newest sync)
   * A chain of consecutive distinct syncs which are no longer valid in their 
current position, but are usable in the next position (while replacement == 
oldSync)
   * The first sync which remains valid even with the updated sections (after 
the loop exits)
   
   Syncs are primarily expired at the border of the second and third sections, 
as the replacement is dropped to keep the existing sync.
   Syncs are expired if they fall in the first section, as they aren't valid in 
their current position or in the following position.
   If syncs are coming in at a consistent spacing (like with a constant 
offset.lag.max) then the length of the first section is always the same, and no 
syncs are expired in the first section.
   
   This particular test is special with the consistent sync spacing, allowing 
the assertion to be >= 0 as only the section 2/3 border will expire syncs. In a 
real scenario with a variable sync spacing, the length of the first section 
will change, and the number of distinct syncs can decrease. However, that is 
due to the loss of those syncs in the sync topic, causing the sync store to 
fill-in the slots that would be distinct with more copies of the latest sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to