This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 53484150114 [improve][client] In cases where there is a risk of 
message loss, adjust the log level to error (#25854)
53484150114 is described below

commit 53484150114cea06569b4d94a84d4b3fd670baed
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 24 20:24:03 2026 +0800

    [improve][client] In cases where there is a risk of message loss, adjust 
the log level to error (#25854)
    
    (cherry picked from commit 09035ffddada146f3ef014777ee5e5766f01f3f2)
---
 .../org/apache/pulsar/client/impl/ConsumerBase.java  | 20 +++++++++++++-------
 .../org/apache/pulsar/client/impl/ConsumerImpl.java  | 12 ++++++++++++
 2 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 4e0f4466925..a4e5bef3083 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -331,10 +331,13 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void completePendingReceive(CompletableFuture<Message<T>> 
receivedFuture, Message<T> message) {
         getInternalExecutor(message).execute(() -> {
-            if (!receivedFuture.complete(message)) {
-                log.warn("Race condition detected. receive future was already 
completed (cancelled={}) and message was "
-                                + "dropped. message={}",
-                        receivedFuture.isCancelled(), message);
+            if (!receivedFuture.complete(message) && getState() != 
State.Closing && getState() != State.Closed) {
+                log.error("Race condition detected, receive future was already 
completed and message was dropped."
+                        + " In other words, the message was dropped 
internally, the client-side will encounter a"
+                        + " crucial issue: this message will never be consumed 
until the consumer is restarted or"
+                        + " the topic is unloaded. Under normal circumstances, 
this won't happen. It only occurs when"
+                        + " user itself has completed the completable future 
object returned by"
+                        + " \"consumer.receiveAsync()\". message={}, 
cancelled={}", message, receivedFuture.isCancelled());
             }
         });
     }
@@ -1098,9 +1101,12 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void completePendingBatchReceive(CompletableFuture<Messages<T>> 
future, Messages<T> messages) {
         if (!future.complete(messages)) {
-            log.warn("Race condition detected. batch receive future was 
already completed (cancelled={}) and messages"
-                            + " were dropped. messages={}",
-                    future.isCancelled(), messages);
+            log.warn("Race condition detected, receive future was already 
completed and message was dropped."
+                    + " In other words, the message was dropped internally, 
the client-side will encounter a"
+                    + " crucial issue: these message will never be consumed 
until the consumer is restarted or"
+                    + " the topic is unloaded. Under normal circumstances, 
this won't happen. It only occurs when"
+                    + " user itself has completed the completable future 
object returned by"
+                    + " \"consumer.batchReceiveAsync()\". messages={}, 
cancelled={}", messages, future.isCancelled());
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cd2a205c41d..1038d84d89c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1701,12 +1701,24 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
      */
     void notifyPendingReceivedCallback(final Message<T> message, Exception 
exception) {
         if (pendingReceives.isEmpty()) {
+            if (getState() != State.Closing && getState() != State.Closed) {
+                log.error("If you received this log, it means that you 
encountered a bug: a message was"
+                        + " dropped internally, the client-side will encounter 
a crucial issue: this message will"
+                        + " never be consumed until the consumer is restarted 
or the topic is unloaded. message={},"
+                        + " pendingReceives-size={}", message, 
pendingReceives.size());
+            }
             return;
         }
 
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = 
nextPendingReceive();
         if (receivedFuture == null) {
+            if (getState() != State.Closing && getState() != State.Closed) {
+                log.error("The pendingReceives pulled out a null 
conpletableFuture object. If you received this log,"
+                        + " it means that you encountered a bug: a message was"
+                        + " dropped internally, the client-side will encounter 
a crucial issue: this message will never"
+                        + " be consumed until the consumer is restarted or the 
topic is unloaded. message={}", message);
+            }
             return;
         }
 

Reply via email to