This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 825df41ab7e [fix][broker] Fix `getPositionAfterN` infinite loop.
(#17971)
825df41ab7e is described below
commit 825df41ab7ed592fa5ecb0df156d346dedbb1aed
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Oct 13 14:46:14 2022 +0800
[fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)
(cherry picked from commit c73285205dafaf5bed827ad99f7fb8edd3ddb7f2)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++------------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 +++++++++++++++++
2 files changed, 22 insertions(+), 12 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 09a6fad2243..97b2e793f53 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3172,20 +3172,16 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
long entriesToSkip = n;
long currentLedgerId;
long currentEntryId;
-
if (startRange == PositionBound.startIncluded) {
currentLedgerId = startPosition.getLedgerId();
currentEntryId = startPosition.getEntryId();
} else {
- // e.g. a mark-delete position
PositionImpl nextValidPosition =
getNextValidPosition(startPosition);
currentLedgerId = nextValidPosition.getLedgerId();
currentEntryId = nextValidPosition.getEntryId();
}
-
boolean lastLedger = false;
long totalEntriesInCurrentLedger;
-
while (entriesToSkip >= 0) {
// for the current ledger, the number of entries written is
deduced from the lastConfirmedEntry
// for previous ledgers, LedgerInfo in ZK has the number of entries
@@ -3200,10 +3196,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
LedgerInfo ledgerInfo = ledgers.get(currentLedgerId);
totalEntriesInCurrentLedger = ledgerInfo != null ?
ledgerInfo.getEntries() : 0;
}
-
-
- long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger -
currentEntryId;
-
+ long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger > 0
+ ? totalEntriesInCurrentLedger - currentEntryId : 0;
if (unreadEntriesInCurrentLedger >= entriesToSkip) {
// if the current ledger has more entries than what we need to
skip
// then the return position is in the same ledger
@@ -3216,11 +3210,10 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
// there are no more ledgers, return the last position
currentEntryId = totalEntriesInCurrentLedger;
break;
- } else {
- Long lid = ledgers.ceilingKey(currentLedgerId + 1);
- currentLedgerId = lid != null ? lid : (ledgers.lastKey() +
1);
- currentEntryId = 0;
}
+ Long lid = ledgers.ceilingKey(currentLedgerId + 1);
+ currentLedgerId = lid != null ? lid : ledgers.lastKey();
+ currentEntryId = 0;
}
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e03156256f6..3458f51ea92 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2270,6 +2270,23 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
log.info("Target position is {}", targetPosition);
assertEquals(targetPosition.getLedgerId(), secondLedger);
assertEquals(targetPosition.getEntryId(), 4);
+
+ // test for n > NumberOfEntriesInStorage
+ searchPosition = new PositionImpl(secondLedger, 0);
+ targetPosition = managedLedger.getPositionAfterN(searchPosition, 100,
ManagedLedgerImpl.PositionBound.startIncluded);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
+
+ // test for startPosition > current ledger
+ searchPosition = new PositionImpl(999, 0);
+ targetPosition = managedLedger.getPositionAfterN(searchPosition, 0,
ManagedLedgerImpl.PositionBound.startIncluded);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
+
+ searchPosition = new PositionImpl(999, 0);
+ targetPosition = managedLedger.getPositionAfterN(searchPosition, 10,
ManagedLedgerImpl.PositionBound.startExcluded);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
}
@Test