This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c941b2f742d796210ba482c96bd4fb5f66dd61fa
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Dec 22 07:05:48 2021 +0200

    [Broker] Fix race conditions in closing producers and consumers (#13428)
    
    - closing ServerCnx while producers or consumers are created can lead
      to a producer or consumer never getting removed from the topic's
      list of producers
    
    (cherry picked from commit 3316db5a52cdeee49bc90fe18baac28d5688bfe8)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a8d7f23..9044488 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -283,6 +283,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         // Connection is gone, close the producers immediately
         producers.forEach((__, producerFuture) -> {
+            // prevent race conditions in completing producers
+            if (!producerFuture.isDone()
+                    && producerFuture.completeExceptionally(new 
IllegalStateException("Connection closed."))) {
+                return;
+            }
             if (producerFuture.isDone() && 
!producerFuture.isCompletedExceptionally()) {
                 Producer producer = producerFuture.getNow(null);
                 producer.closeNow(true);
@@ -290,17 +295,18 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
 
         consumers.forEach((__, consumerFuture) -> {
-            Consumer consumer;
-            if (consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
-                consumer = consumerFuture.getNow(null);
-            } else {
+            // prevent race conditions in completing consumers
+            if (!consumerFuture.isDone()
+                    && consumerFuture.completeExceptionally(new 
IllegalStateException("Connection closed."))) {
                 return;
             }
-
-            try {
-                consumer.close();
-            } catch (BrokerServiceException e) {
-                log.warn("Consumer {} was already closed: {}", consumer, e);
+            if (consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
+                Consumer consumer = consumerFuture.getNow(null);
+                try {
+                    consumer.close();
+                } catch (BrokerServiceException e) {
+                    log.warn("Consumer {} was already closed: {}", consumer, 
e);
+                }
             }
         });
         this.service.getPulsarStats().recordConnectionClose();

Reply via email to