This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71598c11637 [fix][client]Fixed getting an incorrect `maxMessageSize` 
value when accessing multiple clusters in the same process (#22306)
71598c11637 is described below

commit 71598c1163730defb9fdea85e813fe863c3fe4d2
Author: atomchen <492672...@qq.com>
AuthorDate: Thu Mar 21 17:30:40 2024 +0800

    [fix][client]Fixed getting an incorrect `maxMessageSize` value when 
accessing multiple clusters in the same process (#22306)
    
    Co-authored-by: atomchchen <atomchc...@tencent.com>
---
 .../client/api/SimpleProducerConsumerTest.java     |  6 ++--
 .../client/impl/ProducerMemoryLimitTest.java       | 12 ++++----
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 18 ++++++------
 .../client/impl/AbstractBatchMessageContainer.java |  9 ++++--
 .../client/impl/BatchMessageContainerImpl.java     | 10 +++----
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +-
 .../pulsar/client/impl/ConnectionHandler.java      |  7 +++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 32 ++++++++++++++--------
 9 files changed, 60 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4536bda907b..4c106d39e7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -93,13 +93,13 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 .topic("persistent://my-property/my-ns/my-topic2");
 
         @Cleanup
-        Producer<byte[]> producer = producerBuilder.create();
+        ProducerImpl<byte[]> producer = 
(ProducerImpl<byte[]>)producerBuilder.create();
         List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
-        byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
+        byte[] message = new 
byte[producer.getConnectionHandler().getMaxMessageSize() + 1];
         for (int i = 0; i < maxPendingMessages + 10; i++) {
             Future<MessageId> future = producer.sendAsync(message);
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index d776fdb0ed9..55a67ae644d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
                 .create();
         this.stopBroker();
         try {
-            try (MockedStatic<ClientCnx> mockedStatic = 
Mockito.mockStatic(ClientCnx.class)) {
-                mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
-                producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
-            }
+            ConnectionHandler connectionHandler = 
Mockito.spy(producer.getConnectionHandler());
+            Field field = 
producer.getClass().getDeclaredField("connectionHandler");
+            field.setAccessible(true);
+            field.set(producer, connectionHandler);
+            when(connectionHandler.getMaxMessageSize()).thenReturn(8);
+            producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
             throw new IllegalStateException("can not reach here");
         } catch (PulsarClientException.InvalidMessageException ex) {
             PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 2f8cb655401..42f431e0b9b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -72,13 +72,14 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 .maxPendingMessages(pendingQueueSize)
                 .enableBatching(true)
                 .create();
-
         this.stopBroker();
         try {
-            try (MockedStatic<ClientCnx> mockedStatic = 
Mockito.mockStatic(ClientCnx.class)) {
-                mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
-                
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
-            }
+            ConnectionHandler connectionHandler = 
Mockito.spy(producer.getConnectionHandler());
+            Field field = 
producer.getClass().getDeclaredField("connectionHandler");
+            field.setAccessible(true);
+            field.set(producer, connectionHandler);
+            when(connectionHandler.getMaxMessageSize()).thenReturn(2);
+            producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
             throw new IllegalStateException("can not reach here");
         } catch (PulsarClientException.InvalidMessageException ex) {
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
@@ -86,10 +87,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 
         producer.conf.setBatchingEnabled(false);
         try {
-            try (MockedStatic<ClientCnx> mockedStatic = 
Mockito.mockStatic(ClientCnx.class)) {
-                mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
-                
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
-            }
+            producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
             throw new IllegalStateException("can not reach here");
         } catch (PulsarClientException.InvalidMessageException ex) {
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 8c17d8fcb25..3ba7866350a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.proto.CompressionType;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
 
 /**
  * Batch message container framework.
@@ -59,14 +60,18 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
     public boolean haveEnoughSpace(MessageImpl<?> msg) {
         int messageSize = msg.getDataBuffer().readableBytes();
         return (
-            (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= 
ClientCnx.getMaxMessageSize())
+            (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= 
getMaxMessageSize())
             || (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) 
<= maxBytesInBatch)
         ) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < 
maxNumMessagesInBatch);
     }
+    protected int getMaxMessageSize() {
+        return producer != null && producer.getConnectionHandler() != null
+                ? producer.getConnectionHandler().getMaxMessageSize() : 
Commands.DEFAULT_MAX_MESSAGE_SIZE;
+    }
 
     protected boolean isBatchFull() {
         return (maxBytesInBatch > 0 && currentBatchSizeBytes >= 
maxBytesInBatch)
-            || (maxBytesInBatch <= 0 && currentBatchSizeBytes >= 
ClientCnx.getMaxMessageSize())
+            || (maxBytesInBatch <= 0 && currentBatchSizeBytes >= 
getMaxMessageSize())
             || (maxNumMessagesInBatch > 0 && numMessagesInBatch >= 
maxNumMessagesInBatch);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index bf8c1f9de82..fc5c3a3c679 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -101,7 +101,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
                 lowestSequenceId = 
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
                 this.firstCallback = callback;
                 batchedMessageMetadataAndPayload = allocator.buffer(
-                        Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
+                        Math.min(maxBatchSize, getMaxMessageSize()));
                 
updateAndReserveBatchAllocatedSize(batchedMessageMetadataAndPayload.capacity());
                 if (msg.getMessageBuilder().hasTxnidMostBits() && 
currentTxnidMostBits == -1) {
                     currentTxnidMostBits = 
msg.getMessageBuilder().getTxnidMostBits();
@@ -272,12 +272,12 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             op.setBatchSizeByte(encryptedPayload.readableBytes());
 
             // handle mgs size check as non-batched in 
`ProducerImpl.isMessageSizeExceeded`
-            if (op.getMessageHeaderAndPayloadSize() > 
ClientCnx.getMaxMessageSize()) {
+            if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
                 producer.semaphoreRelease(1);
                 producer.client.getMemoryLimitController().releaseMemory(
                         messages.get(0).getUncompressedSize() + 
batchAllocatedSizeBytes);
                 discard(new PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
+                    "Message size is bigger than " + getMaxMessageSize() + " 
bytes"));
                 return null;
             }
             lowestSequenceId = -1L;
@@ -285,13 +285,13 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         }
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
         updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
-        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+        if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
             producer.semaphoreRelease(messages.size());
             messages.forEach(msg -> producer.client.getMemoryLimitController()
                     .releaseMemory(msg.getUncompressedSize()));
             
producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes);
             discard(new PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
+                    "Message size is bigger than " + getMaxMessageSize() + " 
bytes"));
             return null;
         }
         messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index b3444ae393e..938a0b4d8f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -168,8 +168,7 @@ public class ClientCnx extends PulsarHandler {
     private volatile int numberOfRejectRequests = 0;
 
     @Getter
-    private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
-
+    private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
     protected final int protocolVersion;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 7700596dca3..f0f78420115 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -26,8 +26,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.HandlerState.State;
+import org.apache.pulsar.common.protocol.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +39,10 @@ public class ConnectionHandler {
             AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, 
ClientCnx.class, "clientCnx");
     @SuppressWarnings("unused")
     private volatile ClientCnx clientCnx = null;
+    @Getter
+    @Setter
+    // Since the `clientCnx` variable will be set to null at some times, it is 
necessary to save this value here.
+    private volatile int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
 
     protected final HandlerState state;
     protected final Backoff backoff;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c09e0afe58d..6c2ded819a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -776,6 +776,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @Override
     public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
+        getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
 
         final State state = getState();
         if (state == State.Closing || state == State.Closed) {
@@ -1896,7 +1897,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
         int uncompressedSize = msgMetadata.getUncompressedSize();
         int payloadSize = payload.readableBytes();
-        if (checkMaxMessageSize && payloadSize > 
ClientCnx.getMaxMessageSize()) {
+        if (checkMaxMessageSize && payloadSize > 
getConnectionHandler().getMaxMessageSize()) {
             // payload size is itself corrupted since it cannot be bigger than 
the MaxMessageSize
             log.error("[{}][{}] Got corrupted payload message size {} at {}", 
topic, subscription, payloadSize,
                     messageId);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index da73514deb3..880185f7a97 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -180,9 +180,6 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         this.userProvidedProducerName = StringUtils.isNotBlank(producerName);
         this.partitionIndex = partitionIndex;
         this.pendingMessages = createPendingMessagesQueue();
-        this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
-                ? Math.min(conf.getChunkMaxMessageSize(), 
ClientCnx.getMaxMessageSize())
-                : ClientCnx.getMaxMessageSize();
         if (conf.getMaxPendingMessages() > 0) {
             this.semaphore = Optional.of(new 
Semaphore(conf.getMaxPendingMessages(), true));
         } else {
@@ -275,10 +272,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 
100), TimeUnit.MILLISECONDS)
                 .create(),
             this);
-
+        setChunkMaxMessageSize();
         grabCnx();
     }
 
+    private void setChunkMaxMessageSize() {
+        this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
+                ? Math.min(conf.getChunkMaxMessageSize(), getMaxMessageSize())
+                : getMaxMessageSize();
+    }
+
     protected void semaphoreRelease(final int releaseCountRequest) {
         if (semaphore.isPresent()) {
             if (!errorState) {
@@ -455,14 +458,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
             // validate msg-size (For batching this will be check at the batch 
completion size)
             int compressedSize = compressedPayload.readableBytes();
-            if (compressedSize > ClientCnx.getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
+            if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
                 compressedPayload.release();
                 String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";
                 PulsarClientException.InvalidMessageException 
invalidMessageException =
                         new PulsarClientException.InvalidMessageException(
                                 format("The producer %s of the topic %s sends 
a %s message with %d bytes that exceeds"
                                                 + " %d bytes",
-                        producerName, topic, compressedStr, compressedSize, 
ClientCnx.getMaxMessageSize()));
+                        producerName, topic, compressedStr, compressedSize, 
getMaxMessageSize()));
                 completeCallbackAndReleaseSemaphore(uncompressedSize, 
callback, invalidMessageException);
                 return;
             }
@@ -492,19 +495,19 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         int payloadChunkSize;
         if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
             totalChunks = 1;
-            payloadChunkSize = ClientCnx.getMaxMessageSize();
+            payloadChunkSize = getMaxMessageSize();
         } else {
             // Reserve current metadata size for chunk size to avoid message 
size overflow.
             // NOTE: this is not strictly bounded, as metadata will be updated 
after chunking.
             // So there is a small chance that the final message size is 
larger than ClientCnx.getMaxMessageSize().
             // But it won't cause produce failure as broker have 10 KB padding 
space for these cases.
-            payloadChunkSize = ClientCnx.getMaxMessageSize() - 
msgMetadata.getSerializedSize();
+            payloadChunkSize = getMaxMessageSize() - 
msgMetadata.getSerializedSize();
             if (payloadChunkSize <= 0) {
                 PulsarClientException.InvalidMessageException 
invalidMessageException =
                         new PulsarClientException.InvalidMessageException(
                                 format("The producer %s of the topic %s sends 
a message with %d bytes metadata that "
                                                 + "exceeds %d bytes", 
producerName, topic,
-                                        msgMetadata.getSerializedSize(), 
ClientCnx.getMaxMessageSize()));
+                                        msgMetadata.getSerializedSize(), 
getMaxMessageSize()));
                 completeCallbackAndReleaseSemaphore(uncompressedSize, 
callback, invalidMessageException);
                 compressedPayload.release();
                 return;
@@ -1663,7 +1666,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     @Override
     public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
-        chunkMaxMessageSize = Math.min(chunkMaxMessageSize, 
ClientCnx.getMaxMessageSize());
+        getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
+        setChunkMaxMessageSize();
 
         final long epoch;
         synchronized (this) {
@@ -2323,11 +2327,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     private boolean isMessageSizeExceeded(OpSendMsg op) {
         if (op.msg != null && !conf.isChunkingEnabled()) {
             int messageSize = op.getMessageHeaderAndPayloadSize();
-            if (messageSize > ClientCnx.getMaxMessageSize()) {
+            if (messageSize > getMaxMessageSize()) {
                 releaseSemaphoreForSendOp(op);
                 op.sendComplete(new 
PulsarClientException.InvalidMessageException(
                         format("The producer %s of the topic %s sends a 
message with %d bytes that exceeds %d bytes",
-                                producerName, topic, messageSize, 
ClientCnx.getMaxMessageSize()),
+                                producerName, topic, messageSize, 
getMaxMessageSize()),
                         op.sequenceId));
                 return true;
             }
@@ -2335,6 +2339,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return false;
     }
 
+    private int getMaxMessageSize() {
+        return getConnectionHandler().getMaxMessageSize();
+    }
+
     public long getDelayInMillis() {
         OpSendMsg firstMsg = pendingMessages.peek();
         if (firstMsg != null) {

Reply via email to