This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 71598c11637 [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306) 71598c11637 is described below commit 71598c1163730defb9fdea85e813fe863c3fe4d2 Author: atomchen <492672...@qq.com> AuthorDate: Thu Mar 21 17:30:40 2024 +0800 [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306) Co-authored-by: atomchchen <atomchc...@tencent.com> --- .../client/api/SimpleProducerConsumerTest.java | 6 ++-- .../client/impl/ProducerMemoryLimitTest.java | 12 ++++---- .../pulsar/client/impl/ProducerSemaphoreTest.java | 18 ++++++------ .../client/impl/AbstractBatchMessageContainer.java | 9 ++++-- .../client/impl/BatchMessageContainerImpl.java | 10 +++---- .../org/apache/pulsar/client/impl/ClientCnx.java | 3 +- .../pulsar/client/impl/ConnectionHandler.java | 7 +++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 3 +- .../apache/pulsar/client/impl/ProducerImpl.java | 32 ++++++++++++++-------- 9 files changed, 60 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 4536bda907b..4c106d39e7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -93,13 +93,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; @@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .topic("persistent://my-property/my-ns/my-topic2"); @Cleanup - Producer<byte[]> producer = producerBuilder.create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)producerBuilder.create(); List<Future<MessageId>> futures = new ArrayList<>(); // Asynchronously produce messages - byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1]; + byte[] message = new byte[producer.getConnectionHandler().getMaxMessageSize() + 1]; for (int i = 0; i < maxPendingMessages + 10; i++) { Future<MessageId> future = producer.sendAsync(message); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index d776fdb0ed9..55a67ae644d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { .create(); this.stopBroker(); try { - try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) { - mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8); - producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); - } + ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler()); + Field field = producer.getClass().getDeclaredField("connectionHandler"); + field.setAccessible(true); + field.set(producer, connectionHandler); + when(connectionHandler.getMaxMessageSize()).thenReturn(8); + producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); throw new IllegalStateException("can not reach here"); } catch (PulsarClientException.InvalidMessageException ex) { PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 2f8cb655401..42f431e0b9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.util.FutureUtil; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -72,13 +72,14 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { .maxPendingMessages(pendingQueueSize) .enableBatching(true) .create(); - this.stopBroker(); try { - try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) { - mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); - producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); - } + ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler()); + Field field = producer.getClass().getDeclaredField("connectionHandler"); + field.setAccessible(true); + field.set(producer, connectionHandler); + when(connectionHandler.getMaxMessageSize()).thenReturn(2); + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); throw new IllegalStateException("can not reach here"); } catch (PulsarClientException.InvalidMessageException ex) { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); @@ -86,10 +87,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { producer.conf.setBatchingEnabled(false); try { - try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) { - mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); - producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); - } + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); throw new IllegalStateException("can not reach here"); } catch (PulsarClientException.InvalidMessageException ex) { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java index 8c17d8fcb25..3ba7866350a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.protocol.Commands; /** * Batch message container framework. @@ -59,14 +60,18 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta public boolean haveEnoughSpace(MessageImpl<?> msg) { int messageSize = msg.getDataBuffer().readableBytes(); return ( - (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize()) + (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= getMaxMessageSize()) || (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch) ) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch); } + protected int getMaxMessageSize() { + return producer != null && producer.getConnectionHandler() != null + ? producer.getConnectionHandler().getMaxMessageSize() : Commands.DEFAULT_MAX_MESSAGE_SIZE; + } protected boolean isBatchFull() { return (maxBytesInBatch > 0 && currentBatchSizeBytes >= maxBytesInBatch) - || (maxBytesInBatch <= 0 && currentBatchSizeBytes >= ClientCnx.getMaxMessageSize()) + || (maxBytesInBatch <= 0 && currentBatchSizeBytes >= getMaxMessageSize()) || (maxNumMessagesInBatch > 0 && numMessagesInBatch >= maxNumMessagesInBatch); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index bf8c1f9de82..fc5c3a3c679 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -101,7 +101,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); this.firstCallback = callback; batchedMessageMetadataAndPayload = allocator.buffer( - Math.min(maxBatchSize, ClientCnx.getMaxMessageSize())); + Math.min(maxBatchSize, getMaxMessageSize())); updateAndReserveBatchAllocatedSize(batchedMessageMetadataAndPayload.capacity()); if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) { currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits(); @@ -272,12 +272,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { op.setBatchSizeByte(encryptedPayload.readableBytes()); // handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded` - if (op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()) { + if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) { producer.semaphoreRelease(1); producer.client.getMemoryLimitController().releaseMemory( messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes); discard(new PulsarClientException.InvalidMessageException( - "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); + "Message size is bigger than " + getMaxMessageSize() + " bytes")); return null; } lowestSequenceId = -1L; @@ -285,13 +285,13 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); - if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + if (encryptedPayload.readableBytes() > getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize())); producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes); discard(new PulsarClientException.InvalidMessageException( - "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); + "Message size is bigger than " + getMaxMessageSize() + " bytes")); return null; } messageMetadata.setNumMessagesInBatch(numMessagesInBatch); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index b3444ae393e..938a0b4d8f6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -168,8 +168,7 @@ public class ClientCnx extends PulsarHandler { private volatile int numberOfRejectRequests = 0; @Getter - private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; - + private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; private final int maxNumberOfRejectedRequestPerConnection; private final int rejectedRequestResetTimeSec = 60; protected final int protocolVersion; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 7700596dca3..f0f78420115 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -26,8 +26,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import lombok.Getter; +import lombok.Setter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.HandlerState.State; +import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,10 @@ public class ConnectionHandler { AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx"); @SuppressWarnings("unused") private volatile ClientCnx clientCnx = null; + @Getter + @Setter + // Since the `clientCnx` variable will be set to null at some times, it is necessary to save this value here. + private volatile int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; protected final HandlerState state; protected final Backoff backoff; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c09e0afe58d..6c2ded819a5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -776,6 +776,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @Override public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) { previousExceptions.clear(); + getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize()); final State state = getState(); if (state == State.Closing || state == State.Closed) { @@ -1896,7 +1897,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); int uncompressedSize = msgMetadata.getUncompressedSize(); int payloadSize = payload.readableBytes(); - if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) { + if (checkMaxMessageSize && payloadSize > getConnectionHandler().getMaxMessageSize()) { // payload size is itself corrupted since it cannot be bigger than the MaxMessageSize log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize, messageId); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index da73514deb3..880185f7a97 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -180,9 +180,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne this.userProvidedProducerName = StringUtils.isNotBlank(producerName); this.partitionIndex = partitionIndex; this.pendingMessages = createPendingMessagesQueue(); - this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0 - ? Math.min(conf.getChunkMaxMessageSize(), ClientCnx.getMaxMessageSize()) - : ClientCnx.getMaxMessageSize(); if (conf.getMaxPendingMessages() > 0) { this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true)); } else { @@ -275,10 +272,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS) .create(), this); - + setChunkMaxMessageSize(); grabCnx(); } + private void setChunkMaxMessageSize() { + this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0 + ? Math.min(conf.getChunkMaxMessageSize(), getMaxMessageSize()) + : getMaxMessageSize(); + } + protected void semaphoreRelease(final int releaseCountRequest) { if (semaphore.isPresent()) { if (!errorState) { @@ -455,14 +458,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // validate msg-size (For batching this will be check at the batch completion size) int compressedSize = compressedPayload.readableBytes(); - if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) { + if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { compressedPayload.release(); String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" + " %d bytes", - producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize())); + producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); return; } @@ -492,19 +495,19 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne int payloadChunkSize; if (canAddToBatch(msg) || !conf.isChunkingEnabled()) { totalChunks = 1; - payloadChunkSize = ClientCnx.getMaxMessageSize(); + payloadChunkSize = getMaxMessageSize(); } else { // Reserve current metadata size for chunk size to avoid message size overflow. // NOTE: this is not strictly bounded, as metadata will be updated after chunking. // So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize(). // But it won't cause produce failure as broker have 10 KB padding space for these cases. - payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize(); + payloadChunkSize = getMaxMessageSize() - msgMetadata.getSerializedSize(); if (payloadChunkSize <= 0) { PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s sends a message with %d bytes metadata that " + "exceeds %d bytes", producerName, topic, - msgMetadata.getSerializedSize(), ClientCnx.getMaxMessageSize())); + msgMetadata.getSerializedSize(), getMaxMessageSize())); completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); compressedPayload.release(); return; @@ -1663,7 +1666,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne @Override public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) { previousExceptions.clear(); - chunkMaxMessageSize = Math.min(chunkMaxMessageSize, ClientCnx.getMaxMessageSize()); + getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize()); + setChunkMaxMessageSize(); final long epoch; synchronized (this) { @@ -2323,11 +2327,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private boolean isMessageSizeExceeded(OpSendMsg op) { if (op.msg != null && !conf.isChunkingEnabled()) { int messageSize = op.getMessageHeaderAndPayloadSize(); - if (messageSize > ClientCnx.getMaxMessageSize()) { + if (messageSize > getMaxMessageSize()) { releaseSemaphoreForSendOp(op); op.sendComplete(new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes", - producerName, topic, messageSize, ClientCnx.getMaxMessageSize()), + producerName, topic, messageSize, getMaxMessageSize()), op.sequenceId)); return true; } @@ -2335,6 +2339,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return false; } + private int getMaxMessageSize() { + return getConnectionHandler().getMaxMessageSize(); + } + public long getDelayInMillis() { OpSendMsg firstMsg = pendingMessages.peek(); if (firstMsg != null) {