This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9bc4a2d4d1e KAFKA-15271: Historicalterator can exposes elements that are too new (#14125) 9bc4a2d4d1e is described below commit 9bc4a2d4d1e9b4ce0f322ec6c604c7b7ac7c7d26 Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Tue Aug 8 16:36:59 2023 -0700 KAFKA-15271: Historicalterator can exposes elements that are too new (#14125) A HistoricalIterator at epoch N is supposed to only reveal elements at epoch N or earlier. However, due to a bug, we sometimes will reveal elements which are at a newer epoch than N. The bug does not affect elements that are in the latest epoch (aka topTier). It only affects elements that are newer than N, but which do not persist until the latest epoch. This PR fixes the bug and adds a unit test for this case. Reviewers: David Arthur <mum...@gmail.com> --- .../org/apache/kafka/timeline/SnapshottableHashTable.java | 6 ++++-- .../apache/kafka/timeline/SnapshottableHashTableTest.java | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java index 9284d5964c5..2977e061643 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java @@ -261,8 +261,10 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp int tierSlot = slot >>> shift; BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); for (T object : temp) { - if (BaseHashTable.findSlot(object, topTier.length) == slot) { - ready.add(object); + if (object.startEpoch() <= snapshot.epoch()) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } } } temp.clear(); diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java index 5f85463c15c..2d4a4cf9f04 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java @@ -275,6 +275,21 @@ public class SnapshottableHashTableTest { assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE)); } + @Test + public void testIteratorAtOlderEpoch() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext()); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 4); + assertNull(table.snapshottableAddOrReplace(E_3B)); + registry.getOrCreateSnapshot(0); + assertNull(table.snapshottableAddOrReplace(E_1A)); + registry.getOrCreateSnapshot(1); + assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B)); + registry.getOrCreateSnapshot(2); + assertEquals(E_1B, table.snapshottableRemove(E_1B)); + assertIteratorYields(table.snapshottableIterator(1), E_3B, E_1A); + } + /** * Assert that the given iterator contains the given elements, in any order. * We compare using reference equality here, rather than object equality.