This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ad6edd5ad98f9c706013e4882ca1c71ed3e66d70 Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jul 28 13:57:46 2020 +0800 Fix race condition on close consumer while reconnect to broker. (#7589) ### Modifications Add state check when connection opened of the consumer. If the consumer state is closing or closed, we don’t need to send the subscribe command (cherry picked from commit 0b37b0c76dd2faaa9b8cc8d5b316ff35a307cfc1) --- .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e0f1bcb..1757f1e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -728,6 +728,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @Override public void connectionOpened(final ClientCnx cnx) { + if (getState() == State.Closing || getState() == State.Closed) { + setState(State.Closed); + closeConsumerTasks(); + client.cleanupConsumer(this); + failPendingReceive(); + clearReceiverQueue(); + return; + } setClientCnx(cnx); cnx.registerConsumer(consumerId, this);