This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new cd7168d50d5 [fix][ml] Don't estimate number of entries when ledgers 
are empty, return 1 instead (#24125)
cd7168d50d5 is described below

commit cd7168d50d5f58ad93ab414e593aee8dacda1ff5
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 26 08:57:12 2025 +0200

    [fix][ml] Don't estimate number of entries when ledgers are empty, return 1 
instead (#24125)
    
    (cherry picked from commit 228a98f853ca225b2f5cd0be7f794bcbd458dd88)
---
 .../mledger/impl/EntryCountEstimator.java          | 25 ++++++++++++---
 .../mledger/impl/EntryCountEstimatorTest.java      | 36 +++++++++++++++++++---
 2 files changed, 52 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
index f8870373939..e9fe5999dcd 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
-import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -97,8 +96,7 @@ class EntryCountEstimator {
 
         long estimatedEntryCount = 0;
         long remainingBytesSize = maxSizeBytes;
-        // Start with a default estimated average size per entry, including 
any overhead
-        long currentAvgSize = DEFAULT_ESTIMATED_ENTRY_SIZE + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+        long currentAvgSize = 0;
         // Get a collection of ledger info starting from the read position
         Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgersAfterReadPosition =
                 ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
@@ -159,7 +157,26 @@ class EntryCountEstimator {
 
         // Add any remaining bytes to the estimated entry count considering 
the current average entry size
         if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
-            estimatedEntryCount += remainingBytesSize / currentAvgSize;
+            // need to find the previous non-empty ledger to find the average 
size
+            if (currentAvgSize == 0) {
+                Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgersBeforeReadPosition =
+                        ledgersInfo.headMap(readPosition.getLedgerId(), 
false).descendingMap().values();
+                for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgersBeforeReadPosition) {
+                    long ledgerTotalSize = ledgerInfo.getSize();
+                    long ledgerTotalEntries = ledgerInfo.getEntries();
+                    // Skip processing ledgers that have no entries or size
+                    if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
+                        continue;
+                    }
+                    // Update the average entry size based on the current 
ledger's size and entry count
+                    currentAvgSize = Math.max(1, ledgerTotalSize / 
ledgerTotalEntries)
+                            + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+                    break;
+                }
+            }
+            if (currentAvgSize > 0) {
+                estimatedEntryCount += remainingBytesSize / currentAvgSize;
+            }
         }
 
         // Ensure at least one entry is always returned as the result
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
index a7f4b0e596b..666ff08c315 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
-import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
 import static org.testng.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.NavigableMap;
@@ -31,7 +30,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class EntryCountEstimatorTest {
-
     private NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgersInfo;
     private Position readPosition;
     private Long lastLedgerId;
@@ -202,11 +200,39 @@ public class EntryCountEstimatorTest {
         beforeLastKey.forEach(ledgersInfo::remove);
         lastLedgerTotalEntries = 0;
         lastLedgerTotalSize = 0;
+        int result = estimateEntryCountByBytesSize(Integer.MAX_VALUE);
+        // expect that result is 1 because the estimation couldn't be done
+        assertEquals(result, 1);
+    }
+
+    @Test
+    public void testWithOnlySecondLastLedgerAndEmptyLastLedger() {
+        readPosition = PositionImpl.LATEST;
+        // remove all but the second last and last ledger
+        long secondLastLedgerId = ledgersInfo.lowerKey(lastLedgerId);
+        Set<Long> beforeSecondLastKey = new 
HashSet<>(ledgersInfo.headMap(secondLastLedgerId).keySet());
+        beforeSecondLastKey.forEach(ledgersInfo::remove);
+        lastLedgerTotalEntries = 0;
+        lastLedgerTotalSize = 0;
+        long expectedEntries = 50;
+        long requiredSize =
+                expectedEntries * (2000 / 150 + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        int result = estimateEntryCountByBytesSize(requiredSize);
+        assertEquals(result, expectedEntries);
+    }
+
+    @Test
+    public void testWithMultipleEmptyLedgers() {
+        readPosition = PositionImpl.LATEST;
+        long secondLastLedgerId = ledgersInfo.lowerKey(lastLedgerId);
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo secondLastLedgerInfo = 
ledgersInfo.get(secondLastLedgerId);
+        // make the second last ledger empty
+        ledgersInfo.put(secondLastLedgerId, 
secondLastLedgerInfo.toBuilder().setEntries(0).setSize(0).build());
+        lastLedgerTotalEntries = 0;
+        lastLedgerTotalSize = 0;
         long expectedEntries = 50;
-        // when last is empty, DEFAULT_ESTIMATED_ENTRY_SIZE + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY is used
-        // for the average size per entry
         long requiredSize =
-                expectedEntries * (DEFAULT_ESTIMATED_ENTRY_SIZE + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+                expectedEntries * (3000 / 200 + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
         int result = estimateEntryCountByBytesSize(requiredSize);
         assertEquals(result, expectedEntries);
     }

Reply via email to