This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ad6edd5ad98f9c706013e4882ca1c71ed3e66d70
Author: lipenghui <peng...@apache.org>
AuthorDate: Tue Jul 28 13:57:46 2020 +0800

    Fix race condition on close consumer while reconnect to broker. (#7589)
    
    ### Modifications
    
    Add state check when connection opened of the consumer. If the consumer 
state is closing or closed, we don’t need to send the subscribe command
    
    (cherry picked from commit 0b37b0c76dd2faaa9b8cc8d5b316ff35a307cfc1)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e0f1bcb..1757f1e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -728,6 +728,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     public void connectionOpened(final ClientCnx cnx) {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            setState(State.Closed);
+            closeConsumerTasks();
+            client.cleanupConsumer(this);
+            failPendingReceive();
+            clearReceiverQueue();
+            return;
+        }
         setClientCnx(cnx);
         cnx.registerConsumer(consumerId, this);
 

Reply via email to