This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9b70ba303d5 [fix][ml] Fix NoSuchElementException in
EntryCountEstimator caused by a race condition (#25177)
9b70ba303d5 is described below
commit 9b70ba303d5aa65c497bcdc4684cbc9e9017bb42
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jan 23 05:51:49 2026 +0200
[fix][ml] Fix NoSuchElementException in EntryCountEstimator caused by a
race condition (#25177)
---
.../mledger/impl/EntryCountEstimator.java | 45 ++++++++++++++++------
.../mledger/impl/EntryCountEstimatorTest.java | 40 +++++++++++++++++++
2 files changed, 74 insertions(+), 11 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
index 511379a7ae5..a5d41bad6a8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
@@ -22,6 +22,7 @@ import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKK
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NoSuchElementException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
@@ -82,17 +83,15 @@ class EntryCountEstimator {
return maxEntries;
}
- // Adjust the read position to ensure it falls within the valid range
of available ledgers.
- // This handles special cases such as EARLIEST and LATEST positions by
resetting them
- // to the first available ledger or the last active ledger,
respectively.
- if (lastLedgerId != null && readPosition.getLedgerId() >
lastLedgerId.longValue()) {
- readPosition = PositionFactory.create(lastLedgerId,
Math.max(lastLedgerTotalEntries - 1, 0));
- } else if (lastLedgerId == null && readPosition.getLedgerId() >
ledgersInfo.lastKey()) {
- Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
lastEntry = ledgersInfo.lastEntry();
- readPosition =
- PositionFactory.create(lastEntry.getKey(),
Math.max(lastEntry.getValue().getEntries() - 1, 0));
- } else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
- readPosition = PositionFactory.create(ledgersInfo.firstKey(), 0);
+ if (ledgersInfo.isEmpty()) {
+ return 1;
+ }
+
+ try {
+ readPosition = adjustReadPosition(readPosition, ledgersInfo,
lastLedgerId, lastLedgerTotalEntries);
+ } catch (NoSuchElementException e) {
+ // there was a race condition where ledgersInfo became empty just
before adjustReadPosition was called
+ return 1;
}
long estimatedEntryCount = 0;
@@ -183,4 +182,28 @@ class EntryCountEstimator {
// Ensure at least one entry is always returned as the result
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
}
+
+ private static Position adjustReadPosition(Position readPosition,
+ NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo>
+ ledgersInfo,
+ Long lastLedgerId, long
lastLedgerTotalEntries) {
+ // Adjust the read position to ensure it falls within the valid range
of available ledgers.
+ // This handles special cases such as EARLIEST and LATEST positions by
resetting them
+ // to the first available ledger or the last active ledger,
respectively.
+ if (lastLedgerId != null && readPosition.getLedgerId() >
lastLedgerId.longValue()) {
+ return PositionFactory.create(lastLedgerId,
Math.max(lastLedgerTotalEntries - 1, 0));
+ }
+ long lastKey = ledgersInfo.lastKey();
+ if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
+ Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
lastEntry = ledgersInfo.lastEntry();
+ if (lastEntry != null && lastEntry.getKey() == lastKey) {
+ return PositionFactory.create(lastEntry.getKey(),
Math.max(lastEntry.getValue().getEntries() - 1, 0));
+ }
+ }
+ long firstKey = ledgersInfo.firstKey();
+ if (readPosition.getLedgerId() < firstKey) {
+ return PositionFactory.create(firstKey, 0);
+ }
+ return readPosition;
+ }
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
index c1c1b8dd2c1..1b7bcbf816d 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
@@ -19,9 +19,12 @@
package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.util.HashSet;
import java.util.NavigableMap;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import org.apache.bookkeeper.mledger.Position;
@@ -289,4 +292,41 @@ public class EntryCountEstimatorTest {
int result = estimateEntryCountByBytesSize(Long.MAX_VALUE);
assertEquals(result, maxEntries);
}
+
+ @Test
+ public void testNoLedgers() {
+ readPosition = PositionFactory.EARLIEST;
+ // remove all ledgers from ledgersInfo
+ ledgersInfo.clear();
+ int result = estimateEntryCountByBytesSize(5_000_000);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
+
+ @Test
+ public void testNoLedgersRaceFirstKey() {
+ readPosition = PositionFactory.EARLIEST;
+ // remove all ledgers from ledgersInfo
+ ledgersInfo = mock(NavigableMap.class);
+ when(ledgersInfo.isEmpty()).thenReturn(false);
+ when(ledgersInfo.firstKey()).thenThrow(NoSuchElementException.class);
+ when(ledgersInfo.lastKey()).thenReturn(1L);
+ int result = estimateEntryCountByBytesSize(5_000_000);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
+
+ @Test
+ public void testNoLedgersRaceLastKey() {
+ readPosition = PositionFactory.EARLIEST;
+ // remove all ledgers from ledgersInfo
+ ledgersInfo = mock(NavigableMap.class);
+ lastLedgerId = null;
+ when(ledgersInfo.isEmpty()).thenReturn(false);
+ when(ledgersInfo.firstKey()).thenReturn(1L);
+ when(ledgersInfo.lastKey()).thenThrow(NoSuchElementException.class);
+ int result = estimateEntryCountByBytesSize(5_000_000);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
}
\ No newline at end of file