This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed31d82ccca [fix][broker]Wrong backlog: expected 0 but got 1 (#24938)
ed31d82ccca is described below
commit ed31d82ccca0fed50a3977786f5b7ac306af7dbc
Author: fengyubiao <[email protected]>
AuthorDate: Mon Nov 17 03:24:48 2025 +0800
[fix][broker]Wrong backlog: expected 0 but got 1 (#24938)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 98 ++++--
.../mledger/impl/ManagedCursorListAckTest.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 331 +++++++++++++++++++++
.../client/api/SimpleProducerConsumerTest.java | 47 +++
5 files changed, 467 insertions(+), 23 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d32f0c8e998..e1ee50aad0f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1489,12 +1489,16 @@ public class ManagedCursorImpl implements ManagedCursor
{
// modify mark delete and read position since we are able to
persist new position for cursor
lock.writeLock().lock();
try {
- if (markDeletePosition.compareTo(newMarkDeletePosition) >=
0) {
+ // Correct the variable "messagesConsumedCounter".
+ // BTW, no need to change "messagesConsumedCounter" if new
"markDeletePosition" is the same as the
+ // old one.
+ int compareRes =
ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
+ if (compareRes > 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(),
-getNumberOfEntries(
- Range.closedOpen(newMarkDeletePosition,
markDeletePosition)));
- } else {
+ Range.openClosed(newMarkDeletePosition,
markDeletePosition)));
+ } else if (compareRes < 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(),
getNumberOfEntries(
- Range.closedOpen(markDeletePosition,
newMarkDeletePosition)));
+ Range.openClosed(markDeletePosition,
newMarkDeletePosition)));
}
markDeletePosition = newMarkDeletePosition;
lastMarkDeleteEntry = new
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 17ffecb5617..a42974f9b2c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3868,6 +3868,32 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
});
}
+ /**
+ * Compare two positions. It is different with {@link
Position#compareTo(Position)} when the params are invalid.
+ * For example: position-1 is "1:{latest entry}", and position-2 is
"2:-1", they are the same position.
+ */
+ @VisibleForTesting
+ int comparePositions(Position pos1, Position pos2) {
+ if (pos1 == null || pos2 == null) {
+ throw new IllegalArgumentException("Positions must not be null");
+ }
+ if (ledgers.isEmpty() || pos1.getLedgerId() <
getFirstPosition().getLedgerId()
+ || pos2.getLedgerId() < getFirstPosition().getLedgerId()
+ || pos1.getLedgerId() > getLastPosition().getLedgerId()
+ || pos2.getLedgerId() > getLastPosition().getLedgerId()) {
+ log.warn("[{}] Comparing un-exist position {} and {}", name, pos1,
pos2,
+ new IllegalArgumentException("Comparing un-exist
position"));
+ return pos1.compareTo(pos2);
+ }
+ if (pos1.getLedgerId() == pos2.getLedgerId()) {
+ return Long.compare(pos1.getEntryId(), pos2.getEntryId());
+ }
+ if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
+ return
getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
+ }
+ return pos1.compareTo(pos2);
+ }
+
/**
* Get the number of entries between a contiguous range of two positions.
*
@@ -3880,41 +3906,77 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+ if (comparePositions(fromPosition, toPosition) > 0) {
+ log.warn("[{}] Getting number of entries with an invalid range {}
and {}", name, fromPosition, toPosition);
+ throw new IllegalArgumentException("Invalid range " + range);
+ }
+
+ // 1. If the "fromPosition" is after "toPosition", then there is no
entry in the range.
+ // 2. If both "formPosition" and "toPosition" have negative entry id
amd in the same ledger, then there is no
+ // entry in the range.
+ if (fromPosition.getLedgerId() > toPosition.getLedgerId()
+ || (fromPosition.getLedgerId() == toPosition.getLedgerId()
+ && fromPosition.getEntryId() > toPosition.getEntryId())
+ || (fromPosition.getLedgerId() == toPosition.getLedgerId()
+ && fromPosition.getEntryId() < 0 && toPosition.getEntryId() <
0)) {
+ return 0;
+ }
+ // If the 2 positions are in the same ledger.
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
if (li != null) {
// If the 2 positions are in the same ledger
long count = toPosition.getEntryId() -
fromPosition.getEntryId() - 1;
- count += fromIncluded ? 1 : 0;
- count += toIncluded ? 1 : 0;
+ count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 :
0;
+ count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
return count;
} else {
// if the ledgerId is not in the ledgers, it means it has been
deleted
return 0;
}
- } else {
- long count = 0;
- // If the from & to are pointing to different ledgers, then we
need to :
- // 1. Add the entries in the ledger pointed by toPosition
- count += toPosition.getEntryId();
+ }
+
+ // If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
+ // 1. Add the entries in the ledger pointed by toPosition.
+ // 2. Add the entries in the ledger pointed by fromPosition.
+ // 3. Add the whole ledgers entries in between.
+ long count = 0;
+
+ // 1. Add the entries in the ledger pointed by toPosition.
+ // Add nothing if "toPosition" does not exit in "ledgers".
+ // Add nothing if "toPosition.entryId < 0".
+ LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
+ if (toPosition.getEntryId() >= 0 && toLedger != null) {
+ // To support the use case "cursor.getNumberOfEntries()", which
will use a "toPosition" that is larger
+ // than the LAC.
+ // To support this case, use "Long.MAX_VALUE" if the ledger is the
last one.
+ long entriesInLedger = comparePositions(toPosition,
lastConfirmedEntry) >= 0
+ ? Long.MAX_VALUE : toLedger.getEntries();
+ count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
count += toIncluded ? 1 : 0;
+ }
- // 2. Add the entries in the ledger pointed by fromPosition
- LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
- if (li != null) {
- count += li.getEntries() - (fromPosition.getEntryId() + 1);
+ // 2. Add the entries in the ledger pointed by fromPosition.
+ // Add nothing if "toPosition.entryId < 0".
+ // Add nothing if "toPosition" does not exit in "ledgers".
+ LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
+ if (formLedger != null) {
+ if (fromPosition.getEntryId() < 0) {
+ count += formLedger.getEntries();
+ } else {
+ count += formLedger.getEntries() - (fromPosition.getEntryId()
+ 1);
count += fromIncluded ? 1 : 0;
}
+ }
- // 3. Add the whole ledgers entries in between
- for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(),
false, toPosition.getLedgerId(), false)
- .values()) {
- count += ls.getEntries();
- }
-
- return count;
+ // 3. Add the whole ledgers entries in between
+ for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false,
toPosition.getLedgerId(), false)
+ .values()) {
+ count += ls.getEntries();
}
+
+ return count;
}
/**
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
index c4d3b076ba3..a4895b2624b 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends
MockedBookKeeperTestCase {
private static final Charset Encoding = StandardCharsets.UTF_8;
- @Test(timeOut = 20000)
+ @Test(timeOut = 20000 * 1000)
void testMultiPositionDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(2));
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index eb65cbd2c22..9216bd60ed4 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4774,4 +4774,335 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
cursor.close();
ledger.close();
}
+
+ @Test
+ public void testGetNumberOfEntriesWithRangeParam() throws Exception {
+ final String ledgerName = "ml_" +
UUID.randomUUID().toString().replaceAll("-", "");
+ final String cursorName = "test-cursor";
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName,
config);
+ // Create a cursor to avoid entries being trimmed.
+ ml.openCursor(cursorName);
+ int totalEntries = 35;
+ List<Position> positions = new ArrayList<>(totalEntries);
+ for (int i = 0; i < totalEntries; i++) {
+ Position pos = ml.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+ Iterator<LedgerInfo> iterator =
ml.getLedgersInfo().values().iterator();
+ LedgerInfo ledger1 = iterator.next();
+ LedgerInfo ledger2 = iterator.next();
+ LedgerInfo ledger3 = iterator.next();
+ LedgerInfo ledger4 = iterator.next();
+ assertEquals(ledger1.getEntries(), 10);
+ assertEquals(ledger2.getEntries(), 10);
+ assertEquals(ledger3.getEntries(), 10);
+ assertEquals(ledger4.getLedgerId(), ml.getCurrentLedger().getId());
+
+ // Normal case: same ledger.
+ Range<Position> range11 = Range.closed(positions.get(0),
positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range11), 10);
+ Range<Position> range12 = Range.openClosed(positions.get(1),
positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range12), 8);
+ Range<Position> range13 = Range.closedOpen(positions.get(2),
positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range13), 7);
+
+ // Normal case: crosses ledgers.
+ Range<Position> range21 = Range.closed(positions.get(0),
positions.get(19));
+ assertEquals(ml.getNumberOfEntries(range21), 20);
+ Range<Position> range22 = Range.openClosed(positions.get(0),
positions.get(19));
+ assertEquals(ml.getNumberOfEntries(range22), 19);
+ Range<Position> range23 = Range.closedOpen(positions.get(0),
positions.get(19));
+ assertEquals(ml.getNumberOfEntries(range23), 19);
+ Range<Position> range24 = Range.closed(positions.get(0),
positions.get(29));
+ assertEquals(ml.getNumberOfEntries(range24), 30);
+ Range<Position> range25 = Range.openClosed(positions.get(0),
positions.get(29));
+ assertEquals(ml.getNumberOfEntries(range25), 29);
+ Range<Position> range26 = Range.closedOpen(positions.get(0),
positions.get(29));
+ assertEquals(ml.getNumberOfEntries(range26), 29);
+
+ // Normal case: end with current ledger.
+ Range<Position> range27 = Range.closed(positions.get(0),
positions.get(34));
+ assertEquals(ml.getNumberOfEntries(range27), 35);
+ // Cover the following case.
+ // The use case "cursor.getNumberOfEntries()", which will use a
"toPosition" that with an entry
+ // id that is larger than the LAC.
+ Range<Position> range28 = Range.closed(positions.get(0),
PositionFactory.create(ledger4.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range28), 131);
+
+ // From position that entry id is "-1" & positions in the same ledger.
+ Range<Position> range31 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range31), 10);
+ Range<Position> range32 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range32), 10);
+ Range<Position> range33 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range33), 9);
+
+ // From position that entry id is "-1" & crosses ledgers.
+ Range<Position> range41 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(15));
+ assertEquals(ml.getNumberOfEntries(range41), 16);
+ Range<Position> range42 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(15));
+ assertEquals(ml.getNumberOfEntries(range42), 16);
+ Range<Position> range43 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(15));
+ assertEquals(ml.getNumberOfEntries(range43), 15);
+ Range<Position> range44 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(25));
+ assertEquals(ml.getNumberOfEntries(range44), 26);
+ Range<Position> range45 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(25));
+ assertEquals(ml.getNumberOfEntries(range45), 26);
+ Range<Position> range46 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(25));
+ assertEquals(ml.getNumberOfEntries(range46), 25);
+
+ // Invalid range.
+ try {
+ Range.closed(positions.get(1),
PositionFactory.create(ledger1.getLedgerId(), -1));
+ fail("Should have failed because the range is invalid.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("Invalid range"));
+ }
+ try {
+ Range.closed(positions.get(29), positions.get(0));
+ fail("Should have failed because the range is invalid.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("Invalid range"));
+ }
+
+ // "To position" that entry id is "-1" & crosses ledgers.
+ Range<Position> range61 = Range.closed(positions.get(1),
PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range61), 9);
+ Range<Position> range62 = Range.closedOpen(positions.get(1),
PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range62), 9);
+ Range<Position> range63 = Range.openClosed(positions.get(1),
PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range63), 8);
+ Range<Position> range64 = Range.closed(positions.get(1),
PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range64), 19);
+ Range<Position> range65 = Range.closedOpen(positions.get(1),
PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range65), 19);
+ Range<Position> range66 = Range.openClosed(positions.get(1),
PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range66), 18);
+
+ // "From position" is the latest entry of a ledger.
+ Range<Position> range71 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
positions.get(10));
+ assertEquals(ml.getNumberOfEntries(range71), 2);
+ Range<Position> range72 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
positions.get(10));
+ assertEquals(ml.getNumberOfEntries(range72), 1);
+ Range<Position> range73 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
positions.get(10));
+ assertEquals(ml.getNumberOfEntries(range73), 1);
+
+ // "From position" is the latest entry of a ledger, and "to position"
has a negative entry id.
+ Range<Position> range81 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range81), 1);
+ Range<Position> range82 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range82), 0);
+ Range<Position> range83 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range83), 1);
+
+ // "From position" is the latest entry of a ledger, and "to position"
has a negative entry id & crosses ledgers.
+ Range<Position> range91 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range91), 11);
+ Range<Position> range92 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range92), 10);
+ Range<Position> range93 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range93), 11);
+
+ // "To Position" is larger than LAC.
+ Range<Position> range101 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range101), 21);
+ Range<Position> range102 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range102), 20);
+ Range<Position> range103 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range103), 20);
+
+ // "From position" is smaller than the first one.
+ Range<Position> range111 =
Range.closed(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range111), 30);
+ Range<Position> range112 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range112), 30);
+ Range<Position> range113 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range113), 29);
+
+ // Both "fromPosition" and "toPosition" have negative entry id & in
the same ledger.
+ Range<Position> range121 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger1.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range121), 0);
+ Range<Position> range122 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger1.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range122), 0);
+ // Both "fromPosition" and "toPosition" have negative entry id &
crosses ledgers.
+ Range<Position> range123 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range123), 10);
+ Range<Position> range124 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range124), 20);
+ Range<Position> range125 =
Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1000));
+ assertEquals(ml.getNumberOfEntries(range125), 20);
+ Range<Position> range126 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range126), 10);
+ Range<Position> range127 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range127), 20);
+ Range<Position> range128 =
Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1000));
+ assertEquals(ml.getNumberOfEntries(range128), 20);
+ Range<Position> range129 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range129), 10);
+ Range<Position> range1210 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range1210), 20);
+ Range<Position> range1211 =
Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1000));
+ assertEquals(ml.getNumberOfEntries(range1211), 20);
+ try {
+ Range.openClosed(PositionFactory.create(ledger2.getLedgerId(),
-10),
+ PositionFactory.create(ledger1.getLedgerId(), -1));
+ fail("Should have failed because the range is invalid.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("Invalid range"));
+ }
+
+ // Cover the following case.
+ // The use case "cursor.getNumberOfEntries()", which will use a
"toPosition" that with an entry
+ // id that is larger than the LAC.
+ // The difference with above one: the LAC is not in the latest ledger.
+ ml.close();
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl) factory.open(ledgerName,
config);
+ assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId());
+ Range<Position> range131 = Range.closed(positions.get(0),
PositionFactory.create(ledger4.getLedgerId(), 100));
+ assertEquals(ml2.getNumberOfEntries(range131), 131);
+ Range<Position> range132 = Range.openClosed(positions.get(0),
PositionFactory.create(ledger4.getLedgerId(),
+ 100));
+ assertEquals(ml2.getNumberOfEntries(range132), 130);
+ Range<Position> range133 = Range.closedOpen(positions.get(0),
PositionFactory.create(ledger4.getLedgerId(),
+ 100));
+ assertEquals(ml2.getNumberOfEntries(range133), 130);
+
+ // cleanup.
+ ml2.delete();
+ }
+
+ @Test
+ public void testComparePositions() throws Exception {
+ final String ledgerName = "ml_" +
UUID.randomUUID().toString().replaceAll("-", "");
+ final String cursorName = "test-cursor";
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName,
config);
+ // Create a cursor to avoid entries being trimmed.
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.openCursor(cursorName);
+ int totalEntries = 30;
+ List<Position> positions = new ArrayList<>(totalEntries);
+ for (int i = 0; i < totalEntries; i++) {
+ Position pos = ml.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+ Iterator<LedgerInfo> iterator =
ml.getLedgersInfo().values().iterator();
+ LedgerInfo ledger1 = iterator.next();
+ LedgerInfo ledger2 = iterator.next();
+ LedgerInfo ledger3 = iterator.next();
+ assertEquals(ledger1.getEntries(), 10);
+ assertEquals(ledger2.getEntries(), 10);
+
+ // Normal case: pos1 == pos2.
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(0)),
0);
+ assertEquals(ml.comparePositions(positions.get(9), positions.get(9)),
0);
+ assertEquals(ml.comparePositions(positions.get(29),
positions.get(29)), 0);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(),
-1),
+ PositionFactory.create(ledger2.getLedgerId(), -1)), 0);
+
+ // Normal case: pos1 < pos2.
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(1)),
-1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(9)),
-1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(10)),
-1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(19)),
-1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(20)),
-1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(29)),
-1);
+
+ // Normal case: pos1 > pos2.
+ assertEquals(ml.comparePositions(positions.get(1), positions.get(0)),
1);
+ assertEquals(ml.comparePositions(positions.get(9), positions.get(0)),
1);
+ assertEquals(ml.comparePositions(positions.get(10), positions.get(0)),
1);
+ assertEquals(ml.comparePositions(positions.get(19), positions.get(0)),
1);
+ assertEquals(ml.comparePositions(positions.get(20), positions.get(0)),
1);
+ assertEquals(ml.comparePositions(positions.get(29), positions.get(0)),
1);
+
+ // Pos1 has negative entry id & both positions in the same ledger.
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(),
-1),
+ positions.get(0)), -1);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(),
-1),
+ positions.get(10)), -1);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(),
-1),
+ positions.get(20)), -1);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(),
-1),
+ positions.get(0)), -1);
+ // Pos1 has negative entry id & crosses ledgers.
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(),
-1),
+ positions.get(0)), 1);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(),
-1),
+ positions.get(0)), 1);
+ // Pos1 has negative entry id & the same value.
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(),
-1), positions.get(9)),
+ 0);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(),
-1), positions.get(19)),
+ 0);
+
+ // Pos2 has negative entry id & both positions in the same ledger.
+ assertEquals(ml.comparePositions(positions.get(0),
PositionFactory.create(ledger1.getLedgerId(), -1)),
+ 1);
+ assertEquals(ml.comparePositions(positions.get(10),
PositionFactory.create(ledger2.getLedgerId(), -1)),
+ 1);
+ assertEquals(ml.comparePositions(positions.get(20),
PositionFactory.create(ledger3.getLedgerId(), -1)),
+ 1);
+ assertEquals(ml.comparePositions(positions.get(0),
PositionFactory.create(ledger1.getLedgerId(), -1)),
+ 1);
+ // Pos2 has negative entry id & crosses ledgers.
+ assertEquals(ml.comparePositions(positions.get(0),
PositionFactory.create(ledger2.getLedgerId(), -1)),
+ -1);
+ assertEquals(ml.comparePositions(positions.get(0),
PositionFactory.create(ledger3.getLedgerId(), -1)),
+ -1);
+ // Pos2 has negative entry id & the same value.
+ assertEquals(ml.comparePositions(positions.get(9),
PositionFactory.create(ledger2.getLedgerId(), -1)),
+ 0);
+ assertEquals(ml.comparePositions(positions.get(19),
PositionFactory.create(ledger3.getLedgerId(), -1)),
+ 0);
+
+ // Pos1 does not exist in ledgers.
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId() -
1, 100),
+ positions.get(0)), -1);
+
assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId() +
1, 0),
+ positions.get(29)), 1);
+
+ // Pos2 does not exist in ledgers.
+ assertEquals(ml.comparePositions(positions.get(0),
+ PositionFactory.create(ledger1.getLedgerId() - 1,
100)), 1);
+ assertEquals(ml.comparePositions(positions.get(29),
+ PositionFactory.create(ledger3.getLedgerId() + 1, 0)), -1);
+
+ // cleanup.
+ ml.delete();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 39f5a62306e..b76328f252f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -81,6 +81,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -5386,4 +5387,50 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+
+ @DataProvider
+ public Object[][] trimLedgerBeforeGetStats() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "trimLedgerBeforeGetStats")
+ public void testBacklogAfterCreatedSubscription(boolean
trimLegderBeforeGetStats) throws Exception {
+ String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+ String mlName = TopicName.get(topic).getPersistenceNamingEncoding();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+ if (!trimLegderBeforeGetStats) {
+ config.setRetentionTime(3600, TimeUnit.SECONDS);
+ }
+ ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName,
config);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 4; i++) {
+ producer.send("message-" + i);
+ Thread.sleep(1000);
+ }
+ producer.close();
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+ assertEquals(persistentTopic.getManagedLedger(), ml);
+
+ if (trimLegderBeforeGetStats) {
+ CompletableFuture<Void> trimLedgerFuture = new
CompletableFuture<>();
+ ml.trimConsumedLedgersInBackground(trimLedgerFuture);
+ trimLedgerFuture.join();
+ assertEquals(ml.getLedgersInfo().size(), 1);
+ assertEquals(ml.getCurrentLedgerEntries(), 0);
+ }
+
+ admin.topics().createSubscription(topic, "sub1", MessageId.latest);
+
assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(),
0);
+
+ // cleanup
+ admin.topics().delete(topic, false);
+ }
}