This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c941b2f742d796210ba482c96bd4fb5f66dd61fa Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Dec 22 07:05:48 2021 +0200 [Broker] Fix race conditions in closing producers and consumers (#13428) - closing ServerCnx while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producers (cherry picked from commit 3316db5a52cdeee49bc90fe18baac28d5688bfe8) --- .../apache/pulsar/broker/service/ServerCnx.java | 24 ++++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a8d7f23..9044488 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -283,6 +283,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Connection is gone, close the producers immediately producers.forEach((__, producerFuture) -> { + // prevent race conditions in completing producers + if (!producerFuture.isDone() + && producerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) { + return; + } if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) { Producer producer = producerFuture.getNow(null); producer.closeNow(true); @@ -290,17 +295,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { }); consumers.forEach((__, consumerFuture) -> { - Consumer consumer; - if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { - consumer = consumerFuture.getNow(null); - } else { + // prevent race conditions in completing consumers + if (!consumerFuture.isDone() + && consumerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) { return; } - - try { - consumer.close(); - } catch (BrokerServiceException e) { - log.warn("Consumer {} was already closed: {}", consumer, e); + if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { + Consumer consumer = consumerFuture.getNow(null); + try { + consumer.close(); + } catch (BrokerServiceException e) { + log.warn("Consumer {} was already closed: {}", consumer, e); + } } }); this.service.getPulsarStats().recordConnectionClose();