This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35a16768ff0 [improve] [broker] Make the estimated entry size more
accurate (#23931)
35a16768ff0 is described below
commit 35a16768ff095449e228a8aa1774b26a068e67e9
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 25 12:57:17 2025 +0800
[improve] [broker] Make the estimated entry size more accurate (#23931)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 58 +++++++++----
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +
.../mledger/impl/cache/RangeEntryCacheImpl.java | 12 +--
.../impl/InflightReadsLimiterIntegrationTest.java | 15 ++--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 98 ++++++++++++++++++++--
.../BatchMessageWithBatchIndexLevelTest.java | 12 +--
.../pulsar/broker/stats/ConsumerStatsTest.java | 19 +++--
7 files changed, 166 insertions(+), 50 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 203d48933f0..32d46ff1c3c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -3810,26 +3811,51 @@ public class ManagedCursorImpl implements ManagedCursor
{
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
+ int maxEntriesBasedOnSize =
+ Long.valueOf(estimateEntryCountBySize(maxSizeBytes,
readPosition, ledger)).intValue();
+ return Math.min(maxEntriesBasedOnSize, maxEntries);
+ }
- double avgEntrySize = ledger.getStats().getEntrySizeAverage();
- if (!Double.isFinite(avgEntrySize)) {
- // We don't have yet any stats on the topic entries. Let's try to
use the cursor avg size stats
- avgEntrySize = (double) entriesReadSize / (double)
entriesReadCount;
- }
-
- if (!Double.isFinite(avgEntrySize)) {
- // If we still don't have any information, it means this is the
first time we attempt reading
- // and there are no writes. Let's start with 1 to avoid any
overflow and start the avg stats
- return 1;
+ static long estimateEntryCountBySize(long bytesSize, Position
readPosition, ManagedLedgerImpl ml) {
+ Position posToRead = readPosition;
+ if (!ml.isValidPosition(readPosition)) {
+ posToRead = ml.getNextValidPosition(readPosition);
}
+ long result = 0;
+ long remainingBytesSize = bytesSize;
- int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
- if (maxEntriesBasedOnSize < 1) {
- // We need to read at least one entry
- return 1;
+ while (remainingBytesSize > 0) {
+ // Last ledger.
+ if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
+ if (ml.getCurrentLedgerSize() == 0 ||
ml.getCurrentLedgerEntries() == 0) {
+ // Only read 1 entry if no entries to read.
+ return 1;
+ }
+ long avg = Math.max(1, ml.getCurrentLedgerSize() /
ml.getCurrentLedgerEntries())
+ + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ result += remainingBytesSize / avg;
+ break;
+ }
+ // Skip empty ledger.
+ LedgerInfo ledgerInfo =
ml.getLedgersInfo().get(posToRead.getLedgerId());
+ if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
+ posToRead =
ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(),
Long.MAX_VALUE));
+ continue;
+ }
+ // Calculate entries by average of ledgers.
+ long avg = Math.max(1, ledgerInfo.getSize() /
ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ long remainEntriesOfLedger = ledgerInfo.getEntries() -
posToRead.getEntryId();
+ if (remainEntriesOfLedger * avg >= remainingBytesSize) {
+ result += remainingBytesSize / avg;
+ break;
+ } else {
+ // Calculate for the next ledger.
+ result += remainEntriesOfLedger;
+ remainingBytesSize -= remainEntriesOfLedger * avg;
+ posToRead =
ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(),
Long.MAX_VALUE));
+ }
}
-
- return Math.min(maxEntriesBasedOnSize, maxEntries);
+ return Math.max(result, 1);
}
@Override
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 607b6d09cc2..ad2435756c1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -224,6 +224,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE =
CompletableFuture
.completedFuture(PositionFactory.LATEST);
+ @VisibleForTesting
+ @Getter
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
protected volatile long currentLedgerSize = 0;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index b81015ea639..9a2de9ba8c4 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -303,7 +303,7 @@ public class RangeEntryCacheImpl implements EntryCache {
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition,
numberOfEntries, shouldCacheEntry,
originalCallback, ctx);
} else {
- long estimatedEntrySize = getEstimatedEntrySize();
+ long estimatedEntrySize = getEstimatedEntrySize(lh);
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
if (log.isDebugEnabled()) {
log.debug("Estimated read size: {} bytes for {} entries with
{} estimated entry size",
@@ -419,12 +419,12 @@ public class RangeEntryCacheImpl implements EntryCache {
}
@VisibleForTesting
- public long getEstimatedEntrySize() {
- long estimatedEntrySize = getAvgEntrySize();
- if (estimatedEntrySize == 0) {
- estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
+ public long getEstimatedEntrySize(ReadHandle lh) {
+ if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
+ // No entries stored.
+ return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}
- return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}
private long getAvgEntrySize() {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
index 48f0cf08ddf..6676baf8b55 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
@@ -141,10 +141,9 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
- Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
- Assert.assertEquals(sizePerEntry1, 1 +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ int sizePerEntry =
Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
Awaitility.await().untilAsserted(() -> {
- long remainingBytes =limiter.getRemainingBytes();
+ long remainingBytes = limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
});
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
@@ -165,7 +164,7 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true,
cb2, ctx);
}).start();
- long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1
+ readCount2, 1);
+ long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1
+ readCount2, sizePerEntry);
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
log.info("acquired : {}", bytesAcquired1);
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
@@ -178,9 +177,7 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
- Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
- Assert.assertEquals(sizePerEntry2, 1 +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
- long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2,
1);
+ long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2,
sizePerEntry);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
@@ -191,8 +188,6 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
readCompleteSignal2.countDown();
cb2.entries.join();
- Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
- Assert.assertEquals(sizePerEntry3, 1 +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
@@ -204,7 +199,7 @@ public class InflightReadsLimiterIntegrationTest extends
MockedBookKeeperTestCas
}
private long calculateBytesSizeBeforeFirstReading(int entriesCount, int
perEntrySize) {
- return entriesCount * (perEntrySize +
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ return entriesCount * perEntrySize;
}
class SimpleReadEntriesCallback implements
AsyncCallbacks.ReadEntriesCallback {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d3ea98131ad..1cb09d99539 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
@@ -686,13 +687,15 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ManagedCursor cursor = ledger.openCursor("c1");
for (int i = 0; i < 100; i++) {
- ledger.addEntry(new byte[1024]);
+ ledger.addEntry(new byte[(int) (1024)]);
}
- // First time, since we don't have info, we'll get 1 single entry
- readAndCheck(cursor, 10, 3 * 1024, 1);
+ // Since https://github.com/apache/pulsar/pull/23931 improved the
performance of delivery, the consumer
+ // will get more messages than before(it only receives 1 messages at
the first delivery),
+ int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
+ readAndCheck(cursor, 10, 3 * avg, 3);
// We should only return 3 entries, based on the max size
- readAndCheck(cursor, 20, 3 * 1024, 3);
+ readAndCheck(cursor, 20, 3 * avg, 3);
// If maxSize is < avg, we should get 1 entry
readAndCheck(cursor, 10, 500, 1);
}
@@ -3914,13 +3917,15 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger.addEntry(new byte[1024]);
}
- // First time, since we don't have info, we'll get 1 single entry
- List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
- assertEquals(entries.size(), 1);
+ // Since https://github.com/apache/pulsar/pull/23931 improved the
performance of delivery, the consumer
+ // will get more messages than before(it only receives 1 messages at
the first delivery),
+ int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
+ assertEquals(entries.size(), 3);
entries.forEach(Entry::release);
// We should only return 3 entries, based on the max size
- entries = c.readEntriesOrWait(10, 3 * 1024);
+ entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);
@@ -5164,6 +5169,83 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(positionRef4.get(), position4);
}
+ @Test
+ public void testEstimateEntryCountBySize() throws Exception {
+ final String mlName = "ml-" +
UUID.randomUUID().toString().replaceAll("-", "");
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+ long entryCount0 =
+ ManagedCursorImpl.estimateEntryCountBySize(16,
PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml);
+ assertEquals(entryCount0, 1);
+ // Avoid trimming ledgers.
+ ml.openCursor("c1");
+
+ // Build data.
+ for (int i = 0; i < 100; i++) {
+ ml.addEntry(new byte[]{1});
+ }
+ long ledger1 = ml.getCurrentLedger().getId();
+ ml.getCurrentLedger().close();
+ ml.ledgerClosed(ml.getCurrentLedger());
+ for (int i = 0; i < 100; i++) {
+ ml.addEntry(new byte[]{1, 2});
+ }
+ long ledger2 = ml.getCurrentLedger().getId();
+ ml.getCurrentLedger().close();
+ ml.ledgerClosed(ml.getCurrentLedger());
+ for (int i = 0; i < 100; i++) {
+ ml.addEntry(new byte[]{1, 2, 3, 4});
+ }
+ long ledger3 = ml.getCurrentLedger().getId();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 =
ml.getLedgersInfo().get(ledger1);
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 =
ml.getLedgersInfo().get(ledger2);
+ long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() +
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ long average3 = ml.getCurrentLedgerSize() /
ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+ assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+ assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+
+ // Test: the individual ledgers.
+ long entryCount1 =
+ ManagedCursorImpl.estimateEntryCountBySize(average1 * 16,
PositionFactory.create(ledger1, 0), ml);
+ assertEquals(entryCount1, 16);
+ long entryCount2 =
+ ManagedCursorImpl.estimateEntryCountBySize(average2 * 8,
PositionFactory.create(ledger2, 0), ml);
+ assertEquals(entryCount2, 8);
+ long entryCount3 =
+ ManagedCursorImpl.estimateEntryCountBySize(average3 * 4,
PositionFactory.create(ledger3, 0), ml);
+ assertEquals(entryCount3, 4);
+
+ // Test: across ledgers.
+ long entryCount4 =
+ ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) +
(average2 * 8), PositionFactory.create(ledger1, 0), ml);
+ assertEquals(entryCount4, 108);
+ long entryCount5 =
+ ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) +
(average3 * 4), PositionFactory.create(ledger2, 0), ml);
+ assertEquals(entryCount5, 104);
+ long entryCount6 =
+ ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) +
(average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml);
+ assertEquals(entryCount6, 204);
+
+ long entryCount7 =
+ ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) +
(average2 * 8), PositionFactory.create(ledger1, 80), ml);
+ assertEquals(entryCount7, 28);
+ long entryCount8 =
+ ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) +
(average3 * 4), PositionFactory.create(ledger2, 80), ml);
+ assertEquals(entryCount8, 24);
+ long entryCount9 =
+ ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) +
(average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml);
+ assertEquals(entryCount9, 124);
+
+ // Test: read more than entries written.
+ long entryCount10 =
+ ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) +
(average2 * 100) + (average3 * 100) + (average3 * 4) ,
PositionFactory.create(ledger1, 0), ml);
+ assertEquals(entryCount10, 304);
+
+ // cleanup.
+ ml.delete();
+ }
+
@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 7fa7bf078e0..f21ac130e3c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -85,7 +85,7 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
- .receiverQueueSize(10)
+ .receiverQueueSize(50)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -114,27 +114,29 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
consumer.acknowledge(receive1);
consumer.acknowledge(receive2);
Awaitility.await().untilAsserted(() -> {
-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
+ // Since https://github.com/apache/pulsar/pull/23931 improved the
mechanism of estimate average entry size,
+ // broker will deliver much messages than before. So edit 18 -> 38
here.
+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
});
Message<byte[]> receive3 = consumer.receive();
Message<byte[]> receive4 = consumer.receive();
consumer.acknowledge(receive3);
consumer.acknowledge(receive4);
Awaitility.await().untilAsserted(() -> {
-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
});
// Block cmd-flow send until verify finish. see:
https://github.com/apache/pulsar/pull/17436.
consumer.pause();
Message<byte[]> receive5 = consumer.receive();
consumer.negativeAcknowledge(receive5);
Awaitility.await().pollInterval(1,
TimeUnit.MILLISECONDS).untilAsserted(() -> {
-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
});
// Unblock cmd-flow.
consumer.resume();
consumer.receive();
Awaitility.await().untilAsserted(() -> {
-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 59a911500e5..fc650127f90 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -445,8 +445,13 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(Integer.MAX_VALUE)
.create();
-
- producer.send("first-message");
+ // The first messages deliver: 20 msgs.
+ // Average of "messages per batch" is "1".
+ for (int i = 0; i < 20; i++) {
+ producer.send("first-message");
+ }
+ // The second messages deliver: 20 msgs.
+ // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1)
= 2.9 ~ 3".
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
futures.add(producer.sendAsync("message"));
@@ -480,6 +485,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
metadataConsumer.put("matchValueReschedule", "producer2");
@Cleanup
Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
+ .receiverQueueSize(20)
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
int counter = 0;
@@ -494,14 +500,17 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
}
}
- assertEquals(21, counter);
+ assertEquals(40, counter);
ConsumerStats consumerStats =
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
- assertEquals(21, consumerStats.getMsgOutCounter());
+ assertEquals(40, consumerStats.getMsgOutCounter());
- // Math.round(1 * 0.9 + 0.1 * (20 / 1))
+ // The first messages deliver: 20 msgs.
+ // Average of "messages per batch" is "1".
+ // The second messages deliver: 20 msgs.
+ // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1)
= 2.9 ~ 3".
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);
}