This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ff9703d740 [improve][java-client]Shrink BatchMessageContainer 
maxBatchSize (#17854)
9ff9703d740 is described below

commit 9ff9703d740eb8151b8cf2eb1e7faf074e9cf3c7
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Sun Oct 2 10:37:38 2022 +0800

    [improve][java-client]Shrink BatchMessageContainer maxBatchSize (#17854)
---
 .../client/impl/AbstractBatchMessageContainer.java |  4 ++
 .../client/impl/BatchMessageContainerImpl.java     | 26 +++++++++-
 .../client/impl/BatchMessageContainerImplTest.java | 60 +++++++++++++++++++++-
 3 files changed, 87 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 9b4d1b7d683..784d1e05ac6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -84,6 +84,10 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
         return currentBatchSizeBytes;
     }
 
+    int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
     @Override
     public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
         throw new UnsupportedOperationException();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 49cbc56d2a6..99e82a8c765 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -64,6 +64,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     protected SendCallback firstCallback;
 
     private final ByteBufAllocator allocator;
+    private static final int SHRINK_COOLING_OFF_PERIOD = 10;
+    private int consecutiveShrinkTime = 0;
 
     public BatchMessageContainerImpl() {
         this(PulsarByteBufAllocator.DEFAULT);
@@ -98,7 +100,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
                 messageMetadata.setSequenceId(msg.getSequenceId());
                 lowestSequenceId = 
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
                 this.firstCallback = callback;
-                batchedMessageMetadataAndPayload = allocator.compositeBuffer();
+                batchedMessageMetadataAndPayload = allocator.buffer(
+                        Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
                 if (msg.getMessageBuilder().hasTxnidMostBits() && 
currentTxnidMostBits == -1) {
                     currentTxnidMostBits = 
msg.getMessageBuilder().getTxnidMostBits();
                 }
@@ -167,11 +170,30 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
 
         // Update the current max batch size using the uncompressed size, 
which is what we need in any case to
         // accumulate the batch content
-        maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+        updateMaxBatchSize(uncompressedSize);
         maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
         return compressedPayload;
     }
 
+    void updateMaxBatchSize(int uncompressedSize) {
+        if (uncompressedSize > maxBatchSize) {
+            maxBatchSize = uncompressedSize;
+            consecutiveShrinkTime = 0;
+        } else {
+            int shrank = maxBatchSize - (maxBatchSize >> 2);
+            if (uncompressedSize <= shrank) {
+                if (consecutiveShrinkTime <= SHRINK_COOLING_OFF_PERIOD) {
+                    consecutiveShrinkTime++;
+                } else {
+                    maxBatchSize = shrank;
+                    consecutiveShrinkTime = 0;
+                }
+            } else {
+                consecutiveShrinkTime = 0;
+            }
+        }
+    }
+
     @Override
     public void clear() {
         messages = new ArrayList<>(maxMessagesNum);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index a4498b952cb..29d42338835 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -40,6 +41,63 @@ import org.testng.annotations.Test;
 
 public class BatchMessageContainerImplTest {
 
+    @Test
+    public void testUpdateMaxBatchSize() {
+        int SHRINK_COOLING_OFF_PERIOD = 10;
+        BatchMessageContainerImpl messageContainer = new 
BatchMessageContainerImpl();
+        // check init state
+        assertEquals(messageContainer.getMaxBatchSize(), 1024);
+
+        // test expand
+        messageContainer.updateMaxBatchSize(2048);
+        assertEquals(messageContainer.getMaxBatchSize(), 2048);
+
+        // test cooling-off period
+        messageContainer.updateMaxBatchSize(2);
+        assertEquals(messageContainer.getMaxBatchSize(), 2048);
+
+        // test shrink
+        for (int i = 0; i < 15; ++i) {
+            messageContainer.updateMaxBatchSize(2);
+            if (i < SHRINK_COOLING_OFF_PERIOD) {
+                assertEquals(messageContainer.getMaxBatchSize(), 2048);
+            } else {
+                assertEquals(messageContainer.getMaxBatchSize(), 2048 * 0.75);
+            }
+        }
+
+        messageContainer.updateMaxBatchSize(2048);
+        // test big message sudden appearance
+        for (int i = 0; i < 15; ++i) {
+            if (i == SHRINK_COOLING_OFF_PERIOD - 2) {
+                messageContainer.updateMaxBatchSize(2000);
+            } else {
+                messageContainer.updateMaxBatchSize(2);
+            }
+            assertEquals(messageContainer.getMaxBatchSize(), 2048);
+        }
+
+        // test big and small message alternating occurrence
+        for (int i = 0; i < SHRINK_COOLING_OFF_PERIOD * 3; ++i) {
+            if (i % 2 ==0) {
+                messageContainer.updateMaxBatchSize(2);
+            } else {
+                messageContainer.updateMaxBatchSize(2000);
+            }
+            assertEquals(messageContainer.getMaxBatchSize(), 2048);
+        }
+
+        // test consecutive big message
+        for (int i = 0; i < 15; ++i) {
+            messageContainer.updateMaxBatchSize(2000);
+            assertEquals(messageContainer.getMaxBatchSize(), 2048);
+        }
+
+        // test expand after shrink
+        messageContainer.updateMaxBatchSize(4096);
+        assertEquals(messageContainer.getMaxBatchSize(), 4096);
+    }
+
     @Test
     public void recoveryAfterOom() {
         final AtomicBoolean called = new AtomicBoolean();
@@ -62,7 +120,7 @@ public class BatchMessageContainerImplTest {
         doAnswer((ignore) -> {
             called.set(true);
             throw new OutOfMemoryError("test");
-        }).when(mockAllocator).compositeBuffer();
+        }).when(mockAllocator).buffer(anyInt());
         final BatchMessageContainerImpl batchMessageContainer = new 
BatchMessageContainerImpl(mockAllocator);
         batchMessageContainer.setProducer(producer);
         MessageMetadata messageMetadata1 = new MessageMetadata();

Reply via email to