This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59d1495fef4 [fix][client] Fix failed to close consumer because of the 
error: param memorySize is a negative value (#25805)
59d1495fef4 is described below

commit 59d1495fef46075986a92065bd64229fe0f354ea
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 25 20:30:23 2026 +0800

    [fix][client] Fix failed to close consumer because of the error: param 
memorySize is a negative value (#25805)
---
 .../java/org/apache/pulsar/client/impl/ConsumerBase.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index fe9ec2d59c4..ddf0d1a219d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -991,13 +991,17 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         // synchronize redeliverUnacknowledgedMessages().
         incomingQueueLock.lock();
         try {
-            if (canEnqueueMessage(message) && incomingMessages.offer(message)) 
{
-                // After we have enqueued the messages on `incomingMessages` 
queue, we cannot touch the message
-                // instance anymore, since for pooled messages, this instance 
was possibly already been released
-                // and recycled.
+            if (canEnqueueMessage(message)) {
                 INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
-                getMemoryLimitController().ifPresent(limiter -> 
limiter.forceReserveMemory(messageSize));
-                updateAutoScaleReceiverQueueHint();
+                if (incomingMessages.offer(message)) {
+                    // After we have enqueued the messages on 
`incomingMessages` queue, we cannot touch the message
+                    // instance anymore, since for pooled messages, this 
instance was possibly already been released
+                    // and recycled.
+                    getMemoryLimitController().ifPresent(limiter -> 
limiter.forceReserveMemory(messageSize));
+                    updateAutoScaleReceiverQueueHint();
+                } else {
+                    INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, 
-messageSize);
+                }
             }
         } finally {
             incomingQueueLock.unlock();

Reply via email to