This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 71688ff814a [fix] [client] Messages lost when consumer reconnect 
(#20695)
71688ff814a is described below

commit 71688ff814aae99a278b3baed9ba799b6a294b0c
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue Jul 4 18:01:24 2023 +0800

    [fix] [client] Messages lost when consumer reconnect (#20695)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../org/apache/pulsar/client/impl/ConnectionHandler.java    | 13 +++++++------
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5b8da8dcd7e..913faed45af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1190,7 +1190,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 "Consumer that failed is already present on 
the connection");
                     } else {
                         Consumer consumer = 
existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already 
created:"
+                        log.warn("[{}] Consumer with the same id is already 
created:"
                                         + " consumerId={}, consumer={}",
                                 remoteAddress, consumerId, consumer);
                         commandSender.sendSuccessResponse(requestId);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 365abce3b90..263507dac1d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -64,6 +64,12 @@ public class ConnectionHandler {
     }
 
     protected void grabCnx() {
+        if (!duringConnect.compareAndSet(false, true)) {
+            log.info("[{}] [{}] Skip grabbing the connection since there is a 
pending connection",
+                    state.topic, state.getHandlerName());
+            return;
+        }
+
         if (CLIENT_CNX_UPDATER.get(this) != null) {
             log.warn("[{}] [{}] Client cnx already set, ignoring reconnection 
request",
                     state.topic, state.getHandlerName());
@@ -76,11 +82,6 @@ public class ConnectionHandler {
                     state.topic, state.getHandlerName(), state.getState());
             return;
         }
-        if (!duringConnect.compareAndSet(false, true)) {
-            log.info("[{}] [{}] Skip grabbing the connection since there is a 
pending connection",
-                    state.topic, state.getHandlerName());
-            return;
-        }
 
         try {
             CompletableFuture<ClientCnx> cnxFuture;
@@ -123,8 +124,8 @@ public class ConnectionHandler {
     }
 
     void reconnectLater(Throwable exception) {
-        duringConnect.set(false);
         CLIENT_CNX_UPDATER.set(this, null);
+        duringConnect.set(false);
         if (!isValidStateForReconnection()) {
             log.info("[{}] [{}] Ignoring reconnection request (state: {})",
                     state.topic, state.getHandlerName(), state.getState());

Reply via email to