This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 53484150114 [improve][client] In cases where there is a risk of
message loss, adjust the log level to error (#25854)
53484150114 is described below
commit 53484150114cea06569b4d94a84d4b3fd670baed
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 24 20:24:03 2026 +0800
[improve][client] In cases where there is a risk of message loss, adjust
the log level to error (#25854)
(cherry picked from commit 09035ffddada146f3ef014777ee5e5766f01f3f2)
---
.../org/apache/pulsar/client/impl/ConsumerBase.java | 20 +++++++++++++-------
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 12 ++++++++++++
2 files changed, 25 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 4e0f4466925..a4e5bef3083 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -331,10 +331,13 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
protected void completePendingReceive(CompletableFuture<Message<T>>
receivedFuture, Message<T> message) {
getInternalExecutor(message).execute(() -> {
- if (!receivedFuture.complete(message)) {
- log.warn("Race condition detected. receive future was already
completed (cancelled={}) and message was "
- + "dropped. message={}",
- receivedFuture.isCancelled(), message);
+ if (!receivedFuture.complete(message) && getState() !=
State.Closing && getState() != State.Closed) {
+ log.error("Race condition detected, receive future was already
completed and message was dropped."
+ + " In other words, the message was dropped
internally, the client-side will encounter a"
+ + " crucial issue: this message will never be consumed
until the consumer is restarted or"
+ + " the topic is unloaded. Under normal circumstances,
this won't happen. It only occurs when"
+ + " user itself has completed the completable future
object returned by"
+ + " \"consumer.receiveAsync()\". message={},
cancelled={}", message, receivedFuture.isCancelled());
}
});
}
@@ -1098,9 +1101,12 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
protected void completePendingBatchReceive(CompletableFuture<Messages<T>>
future, Messages<T> messages) {
if (!future.complete(messages)) {
- log.warn("Race condition detected. batch receive future was
already completed (cancelled={}) and messages"
- + " were dropped. messages={}",
- future.isCancelled(), messages);
+ log.warn("Race condition detected, receive future was already
completed and message was dropped."
+ + " In other words, the message was dropped internally,
the client-side will encounter a"
+ + " crucial issue: these message will never be consumed
until the consumer is restarted or"
+ + " the topic is unloaded. Under normal circumstances,
this won't happen. It only occurs when"
+ + " user itself has completed the completable future
object returned by"
+ + " \"consumer.batchReceiveAsync()\". messages={},
cancelled={}", messages, future.isCancelled());
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cd2a205c41d..1038d84d89c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1701,12 +1701,24 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
*/
void notifyPendingReceivedCallback(final Message<T> message, Exception
exception) {
if (pendingReceives.isEmpty()) {
+ if (getState() != State.Closing && getState() != State.Closed) {
+ log.error("If you received this log, it means that you
encountered a bug: a message was"
+ + " dropped internally, the client-side will encounter
a crucial issue: this message will"
+ + " never be consumed until the consumer is restarted
or the topic is unloaded. message={},"
+ + " pendingReceives-size={}", message,
pendingReceives.size());
+ }
return;
}
// fetch receivedCallback from queue
final CompletableFuture<Message<T>> receivedFuture =
nextPendingReceive();
if (receivedFuture == null) {
+ if (getState() != State.Closing && getState() != State.Closed) {
+ log.error("The pendingReceives pulled out a null
conpletableFuture object. If you received this log,"
+ + " it means that you encountered a bug: a message was"
+ + " dropped internally, the client-side will encounter
a crucial issue: this message will never"
+ + " be consumed until the consumer is restarted or the
topic is unloaded. message={}", message);
+ }
return;
}