This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new cd7168d50d5 [fix][ml] Don't estimate number of entries when ledgers
are empty, return 1 instead (#24125)
cd7168d50d5 is described below
commit cd7168d50d5f58ad93ab414e593aee8dacda1ff5
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 26 08:57:12 2025 +0200
[fix][ml] Don't estimate number of entries when ledgers are empty, return 1
instead (#24125)
(cherry picked from commit 228a98f853ca225b2f5cd0be7f794bcbd458dd88)
---
.../mledger/impl/EntryCountEstimator.java | 25 ++++++++++++---
.../mledger/impl/EntryCountEstimatorTest.java | 36 +++++++++++++++++++---
2 files changed, 52 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
index f8870373939..e9fe5999dcd 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
-import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
@@ -97,8 +96,7 @@ class EntryCountEstimator {
long estimatedEntryCount = 0;
long remainingBytesSize = maxSizeBytes;
- // Start with a default estimated average size per entry, including
any overhead
- long currentAvgSize = DEFAULT_ESTIMATED_ENTRY_SIZE +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ long currentAvgSize = 0;
// Get a collection of ledger info starting from the read position
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgersAfterReadPosition =
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
@@ -159,7 +157,26 @@ class EntryCountEstimator {
// Add any remaining bytes to the estimated entry count considering
the current average entry size
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
- estimatedEntryCount += remainingBytesSize / currentAvgSize;
+ // need to find the previous non-empty ledger to find the average
size
+ if (currentAvgSize == 0) {
+ Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgersBeforeReadPosition =
+ ledgersInfo.headMap(readPosition.getLedgerId(),
false).descendingMap().values();
+ for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo :
ledgersBeforeReadPosition) {
+ long ledgerTotalSize = ledgerInfo.getSize();
+ long ledgerTotalEntries = ledgerInfo.getEntries();
+ // Skip processing ledgers that have no entries or size
+ if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
+ continue;
+ }
+ // Update the average entry size based on the current
ledger's size and entry count
+ currentAvgSize = Math.max(1, ledgerTotalSize /
ledgerTotalEntries)
+ + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ break;
+ }
+ }
+ if (currentAvgSize > 0) {
+ estimatedEntryCount += remainingBytesSize / currentAvgSize;
+ }
}
// Ensure at least one entry is always returned as the result
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
index a7f4b0e596b..666ff08c315 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
-import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
import static org.testng.Assert.assertEquals;
import java.util.HashSet;
import java.util.NavigableMap;
@@ -31,7 +30,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class EntryCountEstimatorTest {
-
private NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgersInfo;
private Position readPosition;
private Long lastLedgerId;
@@ -202,11 +200,39 @@ public class EntryCountEstimatorTest {
beforeLastKey.forEach(ledgersInfo::remove);
lastLedgerTotalEntries = 0;
lastLedgerTotalSize = 0;
+ int result = estimateEntryCountByBytesSize(Integer.MAX_VALUE);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
+
+ @Test
+ public void testWithOnlySecondLastLedgerAndEmptyLastLedger() {
+ readPosition = PositionImpl.LATEST;
+ // remove all but the second last and last ledger
+ long secondLastLedgerId = ledgersInfo.lowerKey(lastLedgerId);
+ Set<Long> beforeSecondLastKey = new
HashSet<>(ledgersInfo.headMap(secondLastLedgerId).keySet());
+ beforeSecondLastKey.forEach(ledgersInfo::remove);
+ lastLedgerTotalEntries = 0;
+ lastLedgerTotalSize = 0;
+ long expectedEntries = 50;
+ long requiredSize =
+ expectedEntries * (2000 / 150 +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ int result = estimateEntryCountByBytesSize(requiredSize);
+ assertEquals(result, expectedEntries);
+ }
+
+ @Test
+ public void testWithMultipleEmptyLedgers() {
+ readPosition = PositionImpl.LATEST;
+ long secondLastLedgerId = ledgersInfo.lowerKey(lastLedgerId);
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo secondLastLedgerInfo =
ledgersInfo.get(secondLastLedgerId);
+ // make the second last ledger empty
+ ledgersInfo.put(secondLastLedgerId,
secondLastLedgerInfo.toBuilder().setEntries(0).setSize(0).build());
+ lastLedgerTotalEntries = 0;
+ lastLedgerTotalSize = 0;
long expectedEntries = 50;
- // when last is empty, DEFAULT_ESTIMATED_ENTRY_SIZE +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY is used
- // for the average size per entry
long requiredSize =
- expectedEntries * (DEFAULT_ESTIMATED_ENTRY_SIZE +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ expectedEntries * (3000 / 200 +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
int result = estimateEntryCountByBytesSize(requiredSize);
assertEquals(result, expectedEntries);
}