This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 708c5cc0c5f [fix][client] fix incomingMessageSize and client memory
usage is negative (#23624)
708c5cc0c5f is described below
commit 708c5cc0c5f86d6c6bbdb438067122074f4de994
Author: ken <[email protected]>
AuthorDate: Fri Nov 22 09:51:02 2024 +0800
[fix][client] fix incomingMessageSize and client memory usage is negative
(#23624)
Co-authored-by: fanjianye <[email protected]>
---
.../client/api/SimpleProducerConsumerTest.java | 56 +++++++++++++++++++
.../impl/AutoScaledReceiverQueueSizeTest.java | 62 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 5 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +
4 files changed, 125 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 78d28e4b228..9e35b4f262e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4252,6 +4252,62 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
});
}
+ @Test(timeOut = 100000)
+ public void testNegativeIncomingMessageSize() throws Exception {
+ final String topicName =
"persistent://my-property/my-ns/testIncomingMessageSize-" +
+ UUID.randomUUID().toString();
+ final String subName = "my-sub";
+
+ admin.topics().createPartitionedTopic(topicName, 3);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ final int messages = 1000;
+ List<CompletableFuture<MessageId>> messageIds = new
ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+ messageIds.add(producer.newMessage().key(i + "").value(("Message-"
+ i).getBytes()).sendAsync());
+ }
+ FutureUtil.waitForAll(messageIds).get();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+
+ Awaitility.await().untilAsserted(() -> {
+ long size = ((ConsumerBase<byte[]>)
consumer).getIncomingMessageSize();
+ log.info("Check the incoming message size should greater that 0,
current size is {}", size);
+ Assert.assertTrue(size > 0);
+ });
+
+
+ for (int i = 0; i < messages; i++) {
+ consumer.receive();
+ }
+
+
+ Awaitility.await().untilAsserted(() -> {
+ long size = ((ConsumerBase<byte[]>)
consumer).getIncomingMessageSize();
+ log.info("Check the incoming message size should be 0, current
size is {}", size);
+ Assert.assertEquals(size, 0);
+ });
+
+
+ MultiTopicsConsumerImpl multiTopicsConsumer =
(MultiTopicsConsumerImpl) consumer;
+ List<ConsumerImpl<byte[]>> list = multiTopicsConsumer.getConsumers();
+ for (ConsumerImpl<byte[]> subConsumer : list) {
+ long size = subConsumer.getIncomingMessageSize();
+ log.info("Check the sub consumer incoming message size should be
0, current size is {}", size);
+ Assert.assertEquals(size, 0);
+ }
+ }
@Data
@EqualsAndHashCode
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
index 858e43e8465..5359158bf72 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
@@ -20,14 +20,22 @@ package org.apache.pulsar.client.impl;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -257,4 +265,58 @@ public class AutoScaledReceiverQueueSizeTest extends
MockedPulsarServiceBaseTest
Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize()
== currentSize * 2);
log.info("getCurrentReceiverQueueSize={}",
consumer.getCurrentReceiverQueueSize());
}
+
+ @Test
+ public void testNegativeClientMemory() throws Exception {
+ final String topicName = "persistent://public/default/testMemory-" +
+ UUID.randomUUID().toString();
+ final String subName = "my-sub";
+
+ admin.topics().createPartitionedTopic(topicName, 3);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ final int messages = 1000;
+ List<CompletableFuture<MessageId>> messageIds = new
ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+ messageIds.add(producer.newMessage().key(i + "").value(("Message-"
+ i).getBytes()).sendAsync());
+ }
+ FutureUtil.waitForAll(messageIds).get();
+
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .autoScaledReceiverQueueSizeEnabled(true)
+ .subscribe();
+
+
+ Awaitility.await().untilAsserted(() -> {
+ long size = ((ConsumerBase<byte[]>)
consumer).getIncomingMessageSize();
+ log.info("Check the incoming message size should greater that 0,
current size is {}", size);
+ Assert.assertTrue(size > 0);
+ });
+
+
+ for (int i = 0; i < messages; i++) {
+ consumer.receive();
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ long size = ((ConsumerBase<byte[]>)
consumer).getIncomingMessageSize();
+ log.info("Check the incoming message size should be 0, current
size is {}", size);
+ Assert.assertEquals(size, 0);
+ });
+
+
+ MemoryLimitController controller =
((PulsarClientImpl)pulsarClient).getMemoryLimitController();
+ Assert.assertEquals(controller.currentUsage(), 0);
+ Assert.assertEquals(controller.currentUsagePercent(), 0);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0fc906b6e7a..8c10577bc86 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1232,6 +1232,11 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
getMemoryLimitController().ifPresent(limiter ->
limiter.releaseMemory(message.size()));
}
+ protected void increaseIncomingMessageSize(final Message<?> message) {
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
+ getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(message.size()));
+ }
+
public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 004adab56f5..ffdf4cfdc8b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1668,6 +1668,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return;
}
+ // increase incomingMessageSize here because the size would be
decreased in messageProcessed() next step
+ increaseIncomingMessageSize(message);
// increase permits for available message-queue
messageProcessed(message);
// call interceptor and complete received callback