This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b431629e4f1 [fix][client] Fix failed to close consumer because of the
error: param memorySize is a negative value (#25805)
b431629e4f1 is described below
commit b431629e4f17c2cdfece25c484f69238034ed95c
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 25 20:30:23 2026 +0800
[fix][client] Fix failed to close consumer because of the error: param
memorySize is a negative value (#25805)
(cherry picked from commit 59d1495fef46075986a92065bd64229fe0f354ea)
---
.../java/org/apache/pulsar/client/impl/ConsumerBase.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 2cc50266013..65cdbf07d5b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -971,13 +971,17 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
- if (canEnqueueMessage(message) && incomingMessages.offer(message))
{
- // After we have enqueued the messages on `incomingMessages`
queue, we cannot touch the message
- // instance anymore, since for pooled messages, this instance
was possibly already been released
- // and recycled.
+ if (canEnqueueMessage(message)) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
- getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
- updateAutoScaleReceiverQueueHint();
+ if (incomingMessages.offer(message)) {
+ // After we have enqueued the messages on
`incomingMessages` queue, we cannot touch the message
+ // instance anymore, since for pooled messages, this
instance was possibly already been released
+ // and recycled.
+ getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
+ updateAutoScaleReceiverQueueHint();
+ } else {
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
-messageSize);
+ }
}
} finally {
incomingQueueLock.unlock();