This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new bc56bfe97f8 [improve][client] In cases where there is a risk of 
message loss, adjust the log level to error (#25854)
bc56bfe97f8 is described below

commit bc56bfe97f8df3b083701ba2689f69276592590c
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 24 20:24:03 2026 +0800

    [improve][client] In cases where there is a risk of message loss, adjust 
the log level to error (#25854)
    
    (cherry picked from commit 09035ffddada146f3ef014777ee5e5766f01f3f2)
---
 .../org/apache/pulsar/client/impl/ConsumerBase.java | 21 ++++++++++++++-------
 .../org/apache/pulsar/client/impl/ConsumerImpl.java | 12 ++++++++++++
 2 files changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index aed525c9eee..8137f2386a7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -336,10 +336,14 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void completePendingReceive(CompletableFuture<Message<T>> 
receivedFuture, Message<T> message) {
         getInternalExecutor(message).execute(() -> {
-            if (!receivedFuture.complete(message)) {
-                log.warn("Race condition detected. receive future was already 
completed (cancelled={}) and message was "
-                                + "dropped. message={}",
-                        receivedFuture.isCancelled(), message);
+            if (!receivedFuture.complete(message) && getState() != 
State.Closing && getState() != State.Closed) {
+                log.error("Race condition detected, receive future was already 
completed and message was dropped."
+                        + " In other words, the message was dropped 
internally, the client-side will encounter a"
+                        + " crucial issue: this message will never be consumed 
until the consumer is restarted or"
+                        + " the topic is unloaded. Under normal circumstances, 
this won't happen. It only occurs when"
+                        + " user itself has completed the completable future 
object returned by"
+                        + " \"consumer.receiveAsync()\". message={}, 
cancelled={}", message,
+                        receivedFuture.isCancelled());
             }
         });
     }
@@ -1103,9 +1107,12 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void completePendingBatchReceive(CompletableFuture<Messages<T>> 
future, Messages<T> messages) {
         if (!future.complete(messages)) {
-            log.warn("Race condition detected. batch receive future was 
already completed (cancelled={}) and messages"
-                            + " were dropped. messages={}",
-                    future.isCancelled(), messages);
+            log.warn("Race condition detected, receive future was already 
completed and message was dropped."
+                    + " In other words, the message was dropped internally, 
the client-side will encounter a"
+                    + " crucial issue: these message will never be consumed 
until the consumer is restarted or"
+                    + " the topic is unloaded. Under normal circumstances, 
this won't happen. It only occurs when"
+                    + " user itself has completed the completable future 
object returned by"
+                    + " \"consumer.batchReceiveAsync()\". messages={}, 
cancelled={}", messages, future.isCancelled());
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1b844d120b3..5c1d9499226 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1722,12 +1722,24 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
      */
     void notifyPendingReceivedCallback(final Message<T> message, Exception 
exception) {
         if (pendingReceives.isEmpty()) {
+            if (getState() != State.Closing && getState() != State.Closed) {
+                log.error("If you received this log, it means that you 
encountered a bug: a message was"
+                        + " dropped internally, the client-side will encounter 
a crucial issue: this message will"
+                        + " never be consumed until the consumer is restarted 
or the topic is unloaded. message={},"
+                        + " pendingReceives-size={}", message, 
pendingReceives.size());
+            }
             return;
         }
 
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = 
nextPendingReceive();
         if (receivedFuture == null) {
+            if (getState() != State.Closing && getState() != State.Closed) {
+                log.error("The pendingReceives pulled out a null 
conpletableFuture object. If you received this log,"
+                        + " it means that you encountered a bug: a message was"
+                        + " dropped internally, the client-side will encounter 
a crucial issue: this message will never"
+                        + " be consumed until the consumer is restarted or the 
topic is unloaded. message={}", message);
+            }
             return;
         }
 

Reply via email to