C0urante commented on code in PR #14156: URL: https://github.com/apache/kafka/pull/14156#discussion_r1287870397
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { } } - private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, OffsetSync offsetSync) { Review Comment: Nit: feels a little strange that we're passing in a to-be-mutated sync array. Any reason not to alter `updateSyncArray` to only take in the original sync array and the new sync, and construct and return the new sync array? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -153,31 +156,84 @@ public void testPastOffsetTranslation() { } @Test - public void testKeepMostDistinctSyncs() { + public void testConsistentlySpacedSyncs() { // Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs // which provide the best translation accuracy (expires as few syncs as possible) - // Each new sync should be added to the cache and expire at most one other sync from the cache - long iterations = 10000; + long iterations = 100; long maxStep = Long.MAX_VALUE / iterations; // Test a variety of steps (corresponding to the offset.lag.max configuration) for (long step = 1; step < maxStep; step = (step * 2) + 1) { for (long firstOffset = 0; firstOffset < 30; firstOffset++) { - try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { - int lastCount = 1; - store.start(); - for (long offset = firstOffset; offset <= iterations; offset += step) { - store.sync(tp, offset, offset); - // Invariant A: the latest sync is present - assertEquals(offset, store.syncFor(tp, 0).upstreamOffset()); - // Invariant D: the earliest sync is present - assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset()); - int count = countDistinctStoredSyncs(store, tp); - int diff = count - lastCount; - assertTrue(diff >= 0, - "Store expired too many syncs: " + diff + " after receiving offset " + offset); - lastCount = count; - } - } + long finalStep = step; + // Generate a stream of consistently spaced syncs + // Each new sync should be added to the cache and expire at most one other sync from the cache + assertSyncSpacingHasBoundedExpirations(firstOffset, LongStream.generate(() -> finalStep).limit(iterations), 1); + } + } + } + + @Test + public void testRandomlySpacedSyncs() { + Random random = new Random(0L); // arbitrary but deterministic seed + int iterationBits = 10; + long iterations = 1 << iterationBits; + for (int n = 1; n < Long.SIZE - iterationBits; n++) { + // A stream with at most n bits of difference between the largest and smallest steps + // will expire n + 2 syncs at once in the worst case, because the sync store is laid out exponentially. + long maximumDifference = 1L << n; + int maximumExpirations = n + 2; + assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 0L, maximumDifference), maximumExpirations); + // This holds true even if there is a larger minimum step size, such as caused by offsetLagMax + long offsetLagMax = 1L << 16; + assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations); + } + } + + @Test + public void testDroppedSyncsSpacing() { + Random random = new Random(0L); // arbitrary but deterministic seed + long iterations = 10000; + long offsetLagMax = 100; + // Half of the gaps will be offsetLagMax, and half will be double that, as if one intervening sync was dropped. + LongStream stream = random.doubles() + .mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax) + .limit(iterations); + // This will cause up to 2 syncs to be discarded, because a sequence of two adjacent syncs followed by a + // dropped sync will set up the following situation + // before [d....d,c,b,a....] + // after [e......e,d,a....] + // and syncs b and c are discarded to make room for e and the demoted sync d. + assertSyncSpacingHasBoundedExpirations(0, stream, 2); + } + + /** + * Simulate an OffsetSyncStore receiving a sequence of offset syncs as defined by their start offset and gaps. + * After processing each simulated sync, assert that the store has not expired more unique syncs than the bound. + * @param firstOffset First offset to give to the sync store after starting + * @param steps A finite stream of gaps between syncs with some known distribution + * @param maximumExpirations The maximum number of distinct syncs allowed to be expired after a single update. + */ + private void assertSyncSpacingHasBoundedExpirations(long firstOffset, LongStream steps, int maximumExpirations) { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + store.start(); + PrimitiveIterator.OfLong iterator = steps.iterator(); + long offset = firstOffset; + int lastCount = 1; + while (iterator.hasNext()) { + store.sync(tp, offset, offset); + // Invariant A: the latest sync is present + assertEquals(offset, store.syncFor(tp, 0).upstreamOffset()); + // Invariant D: the earliest sync is present + assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset()); + int count = countDistinctStoredSyncs(store, tp); + // We are adding one sync, so if the count didn't change, then exactly one sync expired. + int expiredSyncs = lastCount - count + 1; + assertTrue(expiredSyncs <= maximumExpirations, + "Store expired too many syncs: " + expiredSyncs + " > " + maximumExpirations + + " after receiving offset " + offset); + lastCount = count; + offset += iterator.nextLong(); Review Comment: I think we could address this pretty easily if we dropped our assertions for the case where the store has just started and has only synced `firstOffset`: ```java try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); store.sync(tp, firstOffset, firstOffset); PrimitiveIterator.OfLong iterator = steps.iterator(); long offset = firstOffset; int lastCount = 1; while (iterator.hasNext()) { offset += iterator.nextLong(); assertTrue(offset >= 0, "Test is invalid, offset overflowed"); store.sync(tp, offset, offset); // ... ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -153,31 +156,84 @@ public void testPastOffsetTranslation() { } @Test - public void testKeepMostDistinctSyncs() { + public void testConsistentlySpacedSyncs() { // Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs // which provide the best translation accuracy (expires as few syncs as possible) - // Each new sync should be added to the cache and expire at most one other sync from the cache - long iterations = 10000; + long iterations = 100; long maxStep = Long.MAX_VALUE / iterations; // Test a variety of steps (corresponding to the offset.lag.max configuration) for (long step = 1; step < maxStep; step = (step * 2) + 1) { for (long firstOffset = 0; firstOffset < 30; firstOffset++) { - try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { - int lastCount = 1; - store.start(); - for (long offset = firstOffset; offset <= iterations; offset += step) { - store.sync(tp, offset, offset); - // Invariant A: the latest sync is present - assertEquals(offset, store.syncFor(tp, 0).upstreamOffset()); - // Invariant D: the earliest sync is present - assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset()); - int count = countDistinctStoredSyncs(store, tp); - int diff = count - lastCount; - assertTrue(diff >= 0, - "Store expired too many syncs: " + diff + " after receiving offset " + offset); - lastCount = count; - } - } + long finalStep = step; + // Generate a stream of consistently spaced syncs + // Each new sync should be added to the cache and expire at most one other sync from the cache + assertSyncSpacingHasBoundedExpirations(firstOffset, LongStream.generate(() -> finalStep).limit(iterations), 1); + } + } + } + + @Test + public void testRandomlySpacedSyncs() { + Random random = new Random(0L); // arbitrary but deterministic seed + int iterationBits = 10; + long iterations = 1 << iterationBits; + for (int n = 1; n < Long.SIZE - iterationBits; n++) { + // A stream with at most n bits of difference between the largest and smallest steps + // will expire n + 2 syncs at once in the worst case, because the sync store is laid out exponentially. + long maximumDifference = 1L << n; + int maximumExpirations = n + 2; + assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 0L, maximumDifference), maximumExpirations); + // This holds true even if there is a larger minimum step size, such as caused by offsetLagMax + long offsetLagMax = 1L << 16; + assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations); + } + } + + @Test + public void testDroppedSyncsSpacing() { + Random random = new Random(0L); // arbitrary but deterministic seed + long iterations = 10000; + long offsetLagMax = 100; + // Half of the gaps will be offsetLagMax, and half will be double that, as if one intervening sync was dropped. + LongStream stream = random.doubles() + .mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax) + .limit(iterations); + // This will cause up to 2 syncs to be discarded, because a sequence of two adjacent syncs followed by a + // dropped sync will set up the following situation + // before [d....d,c,b,a....] + // after [e......e,d,a....] + // and syncs b and c are discarded to make room for e and the demoted sync d. + assertSyncSpacingHasBoundedExpirations(0, stream, 2); + } + + /** + * Simulate an OffsetSyncStore receiving a sequence of offset syncs as defined by their start offset and gaps. + * After processing each simulated sync, assert that the store has not expired more unique syncs than the bound. + * @param firstOffset First offset to give to the sync store after starting + * @param steps A finite stream of gaps between syncs with some known distribution + * @param maximumExpirations The maximum number of distinct syncs allowed to be expired after a single update. + */ + private void assertSyncSpacingHasBoundedExpirations(long firstOffset, LongStream steps, int maximumExpirations) { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + store.start(); + PrimitiveIterator.OfLong iterator = steps.iterator(); + long offset = firstOffset; + int lastCount = 1; + while (iterator.hasNext()) { + store.sync(tp, offset, offset); + // Invariant A: the latest sync is present + assertEquals(offset, store.syncFor(tp, 0).upstreamOffset()); + // Invariant D: the earliest sync is present + assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset()); + int count = countDistinctStoredSyncs(store, tp); + // We are adding one sync, so if the count didn't change, then exactly one sync expired. + int expiredSyncs = lastCount - count + 1; + assertTrue(expiredSyncs <= maximumExpirations, + "Store expired too many syncs: " + expiredSyncs + " > " + maximumExpirations + + " after receiving offset " + offset); + lastCount = count; + offset += iterator.nextLong(); Review Comment: Doesn't this technically omit the final offset since we'll return to the top of the while loop and `iterator::hasNext` will return false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org