This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new f8ce46e9fd9 Fix MaxQueueSize semaphore release leak in createOpSendMsg
(#16958)
f8ce46e9fd9 is described below
commit f8ce46e9fd918d78c1261eccc72ae384b2b1e9f4
Author: lixinyang <[email protected]>
AuthorDate: Tue Aug 9 09:10:52 2022 +0800
Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16958)
---
.../pulsar/client/impl/ProducerSemaphoreTest.java | 33 ++++++++++++++++++++++
.../client/impl/BatchMessageContainerImpl.java | 1 +
2 files changed, 34 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 181e9d05d9c..de858c8d2bd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -58,6 +58,39 @@ public class ProducerSemaphoreTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Test(timeOut = 10_000)
+ public void testProducerSemaphoreInvalidMessage() throws Exception {
+ final int pendingQueueSize = 100;
+
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerSemaphoreAcquire")
+ .maxPendingMessages(pendingQueueSize)
+ .enableBatching(true)
+ .create();
+
+ this.stopBroker();
+
+ Field maxMessageSizeFiled =
ClientCnx.class.getDeclaredField("maxMessageSize");
+ maxMessageSizeFiled.setAccessible(true);
+ maxMessageSizeFiled.set(null, 2);
+
+ try {
+ producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+ Assert.fail("can not reach here");
+ } catch (PulsarClientException.InvalidMessageException ex) {
+
Assert.assertEquals(producer.getSemaphore().get().availablePermits(),
pendingQueueSize);
+ }
+
+ producer.conf.setBatchingEnabled(false);
+ try {
+ producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+ Assert.fail("can not reach here");
+ } catch (PulsarClientException.InvalidMessageException ex) {
+
Assert.assertEquals(producer.getSemaphore().get().availablePermits(),
pendingQueueSize);
+ }
+ }
+
@Test(timeOut = 30000)
public void testProducerSemaphoreAcquireAndRelease() throws
PulsarClientException, ExecutionException, InterruptedException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 996875a7131..e0ab2d942ca 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -199,6 +199,7 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ producer.semaphoreRelease(messages.size());
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
return null;