This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9ff9703d740 [improve][java-client]Shrink BatchMessageContainer
maxBatchSize (#17854)
9ff9703d740 is described below
commit 9ff9703d740eb8151b8cf2eb1e7faf074e9cf3c7
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Sun Oct 2 10:37:38 2022 +0800
[improve][java-client]Shrink BatchMessageContainer maxBatchSize (#17854)
---
.../client/impl/AbstractBatchMessageContainer.java | 4 ++
.../client/impl/BatchMessageContainerImpl.java | 26 +++++++++-
.../client/impl/BatchMessageContainerImplTest.java | 60 +++++++++++++++++++++-
3 files changed, 87 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 9b4d1b7d683..784d1e05ac6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -84,6 +84,10 @@ public abstract class AbstractBatchMessageContainer
implements BatchMessageConta
return currentBatchSizeBytes;
}
+ int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
@Override
public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
throw new UnsupportedOperationException();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 49cbc56d2a6..99e82a8c765 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -64,6 +64,8 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
protected SendCallback firstCallback;
private final ByteBufAllocator allocator;
+ private static final int SHRINK_COOLING_OFF_PERIOD = 10;
+ private int consecutiveShrinkTime = 0;
public BatchMessageContainerImpl() {
this(PulsarByteBufAllocator.DEFAULT);
@@ -98,7 +100,8 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
messageMetadata.setSequenceId(msg.getSequenceId());
lowestSequenceId =
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
- batchedMessageMetadataAndPayload = allocator.compositeBuffer();
+ batchedMessageMetadataAndPayload = allocator.buffer(
+ Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
if (msg.getMessageBuilder().hasTxnidMostBits() &&
currentTxnidMostBits == -1) {
currentTxnidMostBits =
msg.getMessageBuilder().getTxnidMostBits();
}
@@ -167,11 +170,30 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
// Update the current max batch size using the uncompressed size,
which is what we need in any case to
// accumulate the batch content
- maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+ updateMaxBatchSize(uncompressedSize);
maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
return compressedPayload;
}
+ void updateMaxBatchSize(int uncompressedSize) {
+ if (uncompressedSize > maxBatchSize) {
+ maxBatchSize = uncompressedSize;
+ consecutiveShrinkTime = 0;
+ } else {
+ int shrank = maxBatchSize - (maxBatchSize >> 2);
+ if (uncompressedSize <= shrank) {
+ if (consecutiveShrinkTime <= SHRINK_COOLING_OFF_PERIOD) {
+ consecutiveShrinkTime++;
+ } else {
+ maxBatchSize = shrank;
+ consecutiveShrinkTime = 0;
+ }
+ } else {
+ consecutiveShrinkTime = 0;
+ }
+ }
+ }
+
@Override
public void clear() {
messages = new ArrayList<>(maxMessagesNum);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index a4498b952cb..29d42338835 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +41,63 @@ import org.testng.annotations.Test;
public class BatchMessageContainerImplTest {
+ @Test
+ public void testUpdateMaxBatchSize() {
+ int SHRINK_COOLING_OFF_PERIOD = 10;
+ BatchMessageContainerImpl messageContainer = new
BatchMessageContainerImpl();
+ // check init state
+ assertEquals(messageContainer.getMaxBatchSize(), 1024);
+
+ // test expand
+ messageContainer.updateMaxBatchSize(2048);
+ assertEquals(messageContainer.getMaxBatchSize(), 2048);
+
+ // test cooling-off period
+ messageContainer.updateMaxBatchSize(2);
+ assertEquals(messageContainer.getMaxBatchSize(), 2048);
+
+ // test shrink
+ for (int i = 0; i < 15; ++i) {
+ messageContainer.updateMaxBatchSize(2);
+ if (i < SHRINK_COOLING_OFF_PERIOD) {
+ assertEquals(messageContainer.getMaxBatchSize(), 2048);
+ } else {
+ assertEquals(messageContainer.getMaxBatchSize(), 2048 * 0.75);
+ }
+ }
+
+ messageContainer.updateMaxBatchSize(2048);
+ // test big message sudden appearance
+ for (int i = 0; i < 15; ++i) {
+ if (i == SHRINK_COOLING_OFF_PERIOD - 2) {
+ messageContainer.updateMaxBatchSize(2000);
+ } else {
+ messageContainer.updateMaxBatchSize(2);
+ }
+ assertEquals(messageContainer.getMaxBatchSize(), 2048);
+ }
+
+ // test big and small message alternating occurrence
+ for (int i = 0; i < SHRINK_COOLING_OFF_PERIOD * 3; ++i) {
+ if (i % 2 ==0) {
+ messageContainer.updateMaxBatchSize(2);
+ } else {
+ messageContainer.updateMaxBatchSize(2000);
+ }
+ assertEquals(messageContainer.getMaxBatchSize(), 2048);
+ }
+
+ // test consecutive big message
+ for (int i = 0; i < 15; ++i) {
+ messageContainer.updateMaxBatchSize(2000);
+ assertEquals(messageContainer.getMaxBatchSize(), 2048);
+ }
+
+ // test expand after shrink
+ messageContainer.updateMaxBatchSize(4096);
+ assertEquals(messageContainer.getMaxBatchSize(), 4096);
+ }
+
@Test
public void recoveryAfterOom() {
final AtomicBoolean called = new AtomicBoolean();
@@ -62,7 +120,7 @@ public class BatchMessageContainerImplTest {
doAnswer((ignore) -> {
called.set(true);
throw new OutOfMemoryError("test");
- }).when(mockAllocator).compositeBuffer();
+ }).when(mockAllocator).buffer(anyInt());
final BatchMessageContainerImpl batchMessageContainer = new
BatchMessageContainerImpl(mockAllocator);
batchMessageContainer.setProducer(producer);
MessageMetadata messageMetadata1 = new MessageMetadata();