This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new cbe847c457f KAFKA-15271: Historicalterator can exposes elements that are too new (#14125) cbe847c457f is described below commit cbe847c457f4c372056e6644ba6493942a83bee6 Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Tue Aug 8 16:36:59 2023 -0700 KAFKA-15271: Historicalterator can exposes elements that are too new (#14125) A HistoricalIterator at epoch N is supposed to only reveal elements at epoch N or earlier. However, due to a bug, we sometimes will reveal elements which are at a newer epoch than N. The bug does not affect elements that are in the latest epoch (aka topTier). It only affects elements that are newer than N, but which do not persist until the latest epoch. This PR fixes the bug and adds a unit test for this case. Reviewers: David Arthur <mum...@gmail.com> --- .../org/apache/kafka/timeline/SnapshottableHashTable.java | 6 ++++-- .../apache/kafka/timeline/SnapshottableHashTableTest.java | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java index 9284d5964c5..2977e061643 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java @@ -261,8 +261,10 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp int tierSlot = slot >>> shift; BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot); for (T object : temp) { - if (BaseHashTable.findSlot(object, topTier.length) == slot) { - ready.add(object); + if (object.startEpoch() <= snapshot.epoch()) { + if (BaseHashTable.findSlot(object, topTier.length) == slot) { + ready.add(object); + } } } temp.clear(); diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java index 01ea35935cd..8846262c2c5 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java @@ -273,6 +273,21 @@ public class SnapshottableHashTableTest { assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE)); } + @Test + public void testIteratorAtOlderEpoch() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext()); + SnapshottableHashTable<TestElement> table = + new SnapshottableHashTable<>(registry, 4); + assertNull(table.snapshottableAddOrReplace(E_3B)); + registry.getOrCreateSnapshot(0); + assertNull(table.snapshottableAddOrReplace(E_1A)); + registry.getOrCreateSnapshot(1); + assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B)); + registry.getOrCreateSnapshot(2); + assertEquals(E_1B, table.snapshottableRemove(E_1B)); + assertIteratorYields(table.snapshottableIterator(1), E_3B, E_1A); + } + /** * Assert that the given iterator contains the given elements, in any order. * We compare using reference equality here, rather than object equality.