This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b431629e4f1 [fix][client] Fix failed to close consumer because of the 
error: param memorySize is a negative value (#25805)
b431629e4f1 is described below

commit b431629e4f17c2cdfece25c484f69238034ed95c
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 25 20:30:23 2026 +0800

    [fix][client] Fix failed to close consumer because of the error: param 
memorySize is a negative value (#25805)
    
    (cherry picked from commit 59d1495fef46075986a92065bd64229fe0f354ea)
---
 .../java/org/apache/pulsar/client/impl/ConsumerBase.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 2cc50266013..65cdbf07d5b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -971,13 +971,17 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         // synchronize redeliverUnacknowledgedMessages().
         incomingQueueLock.lock();
         try {
-            if (canEnqueueMessage(message) && incomingMessages.offer(message)) 
{
-                // After we have enqueued the messages on `incomingMessages` 
queue, we cannot touch the message
-                // instance anymore, since for pooled messages, this instance 
was possibly already been released
-                // and recycled.
+            if (canEnqueueMessage(message)) {
                 INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
-                getMemoryLimitController().ifPresent(limiter -> 
limiter.forceReserveMemory(messageSize));
-                updateAutoScaleReceiverQueueHint();
+                if (incomingMessages.offer(message)) {
+                    // After we have enqueued the messages on 
`incomingMessages` queue, we cannot touch the message
+                    // instance anymore, since for pooled messages, this 
instance was possibly already been released
+                    // and recycled.
+                    getMemoryLimitController().ifPresent(limiter -> 
limiter.forceReserveMemory(messageSize));
+                    updateAutoScaleReceiverQueueHint();
+                } else {
+                    INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, 
-messageSize);
+                }
             }
         } finally {
             incomingQueueLock.unlock();

Reply via email to