This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new f8ce46e9fd9 Fix MaxQueueSize semaphore release leak in createOpSendMsg 
(#16958)
f8ce46e9fd9 is described below

commit f8ce46e9fd918d78c1261eccc72ae384b2b1e9f4
Author: lixinyang <[email protected]>
AuthorDate: Tue Aug 9 09:10:52 2022 +0800

    Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16958)
---
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 33 ++++++++++++++++++++++
 .../client/impl/BatchMessageContainerImpl.java     |  1 +
 2 files changed, 34 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 181e9d05d9c..de858c8d2bd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -58,6 +58,39 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 10_000)
+    public void testProducerSemaphoreInvalidMessage() throws Exception {
+        final int pendingQueueSize = 100;
+
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic("testProducerSemaphoreAcquire")
+                .maxPendingMessages(pendingQueueSize)
+                .enableBatching(true)
+                .create();
+
+        this.stopBroker();
+
+        Field maxMessageSizeFiled = 
ClientCnx.class.getDeclaredField("maxMessageSize");
+        maxMessageSizeFiled.setAccessible(true);
+        maxMessageSizeFiled.set(null, 2);
+
+        try {
+            producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+            Assert.fail("can not reach here");
+        } catch (PulsarClientException.InvalidMessageException ex) {
+            
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        }
+
+        producer.conf.setBatchingEnabled(false);
+        try {
+            producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+            Assert.fail("can not reach here");
+        } catch (PulsarClientException.InvalidMessageException ex) {
+            
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
+        }
+    }
+
     @Test(timeOut = 30000)
     public void testProducerSemaphoreAcquireAndRelease() throws 
PulsarClientException, ExecutionException, InterruptedException {
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 996875a7131..e0ab2d942ca 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -199,6 +199,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     public OpSendMsg createOpSendMsg() throws IOException {
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
         if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            producer.semaphoreRelease(messages.size());
             discard(new PulsarClientException.InvalidMessageException(
                     "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
             return null;

Reply via email to