This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 71688ff814a [fix] [client] Messages lost when consumer reconnect (#20695) 71688ff814a is described below commit 71688ff814aae99a278b3baed9ba799b6a294b0c Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Tue Jul 4 18:01:24 2023 +0800 [fix] [client] Messages lost when consumer reconnect (#20695) --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- .../org/apache/pulsar/client/impl/ConnectionHandler.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5b8da8dcd7e..913faed45af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1190,7 +1190,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { "Consumer that failed is already present on the connection"); } else { Consumer consumer = existingConsumerFuture.getNow(null); - log.info("[{}] Consumer with the same id is already created:" + log.warn("[{}] Consumer with the same id is already created:" + " consumerId={}, consumer={}", remoteAddress, consumerId, consumer); commandSender.sendSuccessResponse(requestId); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 365abce3b90..263507dac1d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -64,6 +64,12 @@ public class ConnectionHandler { } protected void grabCnx() { + if (!duringConnect.compareAndSet(false, true)) { + log.info("[{}] [{}] Skip grabbing the connection since there is a pending connection", + state.topic, state.getHandlerName()); + return; + } + if (CLIENT_CNX_UPDATER.get(this) != null) { log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", state.topic, state.getHandlerName()); @@ -76,11 +82,6 @@ public class ConnectionHandler { state.topic, state.getHandlerName(), state.getState()); return; } - if (!duringConnect.compareAndSet(false, true)) { - log.info("[{}] [{}] Skip grabbing the connection since there is a pending connection", - state.topic, state.getHandlerName()); - return; - } try { CompletableFuture<ClientCnx> cnxFuture; @@ -123,8 +124,8 @@ public class ConnectionHandler { } void reconnectLater(Throwable exception) { - duringConnect.set(false); CLIENT_CNX_UPDATER.set(this, null); + duringConnect.set(false); if (!isValidStateForReconnection()) { log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());