This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2ce5a40d2f06a64867e139e70289270abf437d7d
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jul 22 21:09:49 2025 +0800

    [fix][client] Close orphan producer or consumer when the creation is 
interrupted (#24539)
    
    (cherry picked from commit de33f3bc8d0cc142b9f20acf78ccf44202b9bd9c)
---
 .../pulsar/client/impl/ClientInterruptTest.java    | 188 +++++++++++++++++++++
 .../pulsar/client/impl/ConsumerCloseTest.java      |  46 -----
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  26 +--
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   2 +-
 .../pulsar/client/impl/ReaderBuilderImpl.java      |   2 +-
 .../org/apache/pulsar/common/util/FutureUtil.java  |  29 ++++
 6 files changed, 220 insertions(+), 73 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
new file mode 100644
index 00000000000..d6a86fc7959
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ClientInterruptTest extends ProducerConsumerBase {
+
+    private final CreationInterceptor interceptor = new CreationInterceptor();
+    private int index = 0;
+    private PulsarClientImpl client;
+    private String topic;
+    private ExecutorService executor;
+    private CompletableFuture<Void> delayTriggered;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        client = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        pulsar.getBrokerService().setInterceptor(interceptor);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @BeforeMethod
+    public void setupTopic() {
+        interceptor.numConsumerCreated.set(0);
+        interceptor.numProducerCreated.set(0);
+        executor = Executors.newCachedThreadPool();
+        TopicName topicName = TopicName.get("test-topic-" + index++);
+        topic = topicName.toString();
+
+        final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + 
topicName.getPersistenceNamingEncoding();
+        delayTriggered = new CompletableFuture<>();
+        mockZooKeeper.delay(1000L, (op, path) -> {
+            final var result = path.equals(mlPath);
+            if (result) {
+                log.info("Injected delay for {} {}", op, path);
+                delayTriggered.complete(null);
+            }
+            return result;
+        });
+    }
+
+    @AfterMethod(alwaysRun = true, timeOut = 10000)
+    public void cleanupTopic() {
+        executor.shutdown();
+    }
+
+    @Test(timeOut = 10000)
+    public void testCreateProducer() throws Exception {
+        testCreateInterrupt("producer", () -> 
client.newProducer().topic(topic).create());
+    }
+
+    @Test(timeOut = 10000)
+    public void testSubscribe() throws Exception {
+        testCreateInterrupt("consumer", () -> 
client.newConsumer().topic(topic).subscriptionName("sub")
+                .subscribe());
+    }
+
+    @Test(timeOut = 10000)
+    public void testCreateReader() throws Exception {
+        testCreateInterrupt("reader", () -> 
client.newReader().topic(topic).startMessageId(MessageId.earliest)
+                .create());
+    }
+
+    private void testCreateInterrupt(String name, PulsarClientSyncTask task) 
throws Exception {
+        final var exception = new AtomicReference<PulsarClientException>();
+        final var threadInterrupted = new CompletableFuture<Boolean>();
+        final var future = executor.submit(() -> {
+            try {
+                task.run();
+                exception.set(new PulsarClientException("Task " + name + " 
succeeded"));
+            } catch (PulsarClientException e) {
+                exception.set(e);
+            }
+
+            try {
+                Thread.sleep(1);
+                threadInterrupted.complete(false);
+            } catch (InterruptedException __) {
+                threadInterrupted.complete(true);
+            }
+        });
+        delayTriggered.get();
+        future.cancel(true);
+
+        Awaitility.await().untilAsserted(() -> assertNotNull(exception.get()));
+        assertTrue(exception.get().getCause() instanceof InterruptedException);
+
+        Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topic)));
+        final var persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .orElseThrow();
+
+        if (name.equals("producer")) {
+            Awaitility.await().untilAsserted(() -> 
assertEquals(interceptor.numProducerCreated.get(), 1));
+            // Verify the created producer will eventually be closed
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(persistentTopic.getProducers().size(), 0);
+                assertEquals(client.producersCount(), 0);
+            });
+        } else {
+            Awaitility.await().untilAsserted(() -> 
assertEquals(interceptor.numConsumerCreated.get(), 1));
+            // Verify the created consumer will eventually be closed
+            Awaitility.await().untilAsserted(() -> {
+                
persistentTopic.getSubscriptions().values().forEach(subscription ->
+                        assertTrue(subscription.getConsumers().isEmpty()));
+                assertEquals(client.consumersCount(), 0);
+            });
+        }
+        // The thread's interrupt state should not be set, it's the caller's 
responsibility to set the interrupt state
+        // if necessary when catching the `PulsarClientException` that wraps 
an `InterruptedException`
+        assertFalse(threadInterrupted.get());
+    }
+
+    private interface PulsarClientSyncTask {
+
+        void run() throws PulsarClientException;
+    }
+
+
+    private static class CreationInterceptor extends MockBrokerInterceptor  {
+
+        final AtomicInteger numProducerCreated = new AtomicInteger(0);
+        final AtomicInteger numConsumerCreated = new AtomicInteger(0);
+
+        @Override
+        public void producerCreated(ServerCnx cnx, Producer producer, 
Map<String, String> metadata) {
+            numProducerCreated.incrementAndGet();
+        }
+
+        @Override
+        public void consumerCreated(ServerCnx cnx, Consumer consumer, 
Map<String, String> metadata) {
+            numConsumerCreated.incrementAndGet();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
index 66fff46b660..b9355d19c27 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
@@ -20,19 +20,13 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertTrue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.awaitility.Awaitility;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -54,46 +48,6 @@ public class ConsumerCloseTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void testInterruptedWhenCreateConsumer() throws 
InterruptedException {
-
-        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        String subName = "test-sub";
-        String mlCursorPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + 
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + subName;
-
-        // Make create cursor delay 1s
-        CountDownLatch topicLoadLatch = new CountDownLatch(1);
-        for (int i = 0; i < 5; i++) {
-            mockZooKeeper.delay(1000, (op, path) -> {
-                if (mlCursorPath.equals(path)) {
-                    topicLoadLatch.countDown();
-                    return true;
-                }
-                return false;
-            });
-        }
-
-        Thread startConsumer = new Thread(() -> {
-            try {
-                pulsarClient.newConsumer()
-                        .topic(tpName)
-                        .subscriptionName(subName)
-                        .subscribe();
-                Assert.fail("Should have thrown an exception");
-            } catch (PulsarClientException e) {
-                assertTrue(e.getCause() instanceof InterruptedException);
-            }
-        });
-        startConsumer.start();
-        topicLoadLatch.await();
-        startConsumer.interrupt();
-
-        PulsarClientImpl clientImpl = (PulsarClientImpl) pulsarClient;
-        Awaitility.await().ignoreExceptions().atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(clientImpl.consumersCount(), 0);
-        });
-    }
-
     @Test
     public void testReceiveWillDoneAfterClosedConsumer() throws Exception {
         String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index bf796876fcd..c3ccedc3f03 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -71,7 +71,6 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     private ConsumerConfigurationData<T> conf;
     private final Schema<T> schema;
     private List<ConsumerInterceptor<T>> interceptorList;
-    private volatile boolean interruptedBeforeConsumerCreation;
 
     private static final long MIN_ACK_TIMEOUT_MILLIS = 1000;
     private static final long MIN_TICK_TIME_MILLIS = 100;
@@ -101,31 +100,8 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
 
     @Override
     public Consumer<T> subscribe() throws PulsarClientException {
-        CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
         try {
-            subscribeAsync().whenComplete((c, e) -> {
-                if (e != null) {
-                    // If the subscription fails, there is no need to close 
the consumer here,
-                    // as it will be handled in the subscribeAsync method.
-                    future.completeExceptionally(e);
-                    return;
-                }
-                if (interruptedBeforeConsumerCreation) {
-                    c.closeAsync().exceptionally(closeEx -> {
-                        log.error("Failed to close consumer after 
interruption", closeEx.getCause());
-                        return null;
-                    });
-                    future.completeExceptionally(new PulsarClientException(
-                            "Subscription was interrupted before the consumer 
could be fully created"));
-                } else {
-                    future.complete(c);
-                }
-            });
-            return future.get();
-        } catch (InterruptedException e) {
-            interruptedBeforeConsumerCreation = true;
-            Thread.currentThread().interrupt();
-            throw PulsarClientException.unwrap(e);
+            return FutureUtil.getAndCleanupOnInterrupt(subscribeAsync(), 
Consumer::closeAsync);
         } catch (Exception e) {
             throw PulsarClientException.unwrap(e);
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index ecbdfa76b64..0b5399ee258 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -84,7 +84,7 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
     @Override
     public Producer<T> create() throws PulsarClientException {
         try {
-            return createAsync().get();
+            return FutureUtil.getAndCleanupOnInterrupt(createAsync(), 
Producer::closeAsync);
         } catch (Exception e) {
             throw PulsarClientException.unwrap(e);
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 7e74a8e9c9b..739e6ee44c6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -72,7 +72,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> 
{
     @Override
     public Reader<T> create() throws PulsarClientException {
         try {
-            return createAsync().get();
+            return FutureUtil.getAndCleanupOnInterrupt(createAsync(), 
Reader::closeAsync);
         } catch (Exception e) {
             throw PulsarClientException.unwrap(e);
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 454eee0f966..7bad2dc97c9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -381,4 +382,32 @@ public class FutureUtil {
                     return null;
                 });
     }
+
+    /**
+     * Blocks to get the result of a CompletableFuture, while ensuring 
resources are cleaned up
+     * if the wait is interrupted.
+     * <p>
+     * If the current thread is interrupted while waiting, this method 
registers a cleanup action
+     * to be executed when the future eventually completes. This prevents 
resource leaks that
+     * could otherwise occur when an interruption happens but the underlying 
asynchronous task
+     * finishes successfully later. After registering the action, it re-throws 
the
+     * {@link InterruptedException}.
+     *
+     * @param future         The CompletableFuture to wait for.
+     * @param cleanupAction  A consumer that performs a cleanup action (e.g., 
closing a resource)
+     * on the result if the wait is interrupted.
+     * @param <T>            The type of the future's result.
+     * @return The computed result from the future.
+     * @throws InterruptedException if the current thread was interrupted 
while waiting.
+     * @throws ExecutionException   if the future completed exceptionally.
+     */
+    public static <T> T getAndCleanupOnInterrupt(CompletableFuture<T> future, 
Consumer<T> cleanupAction)
+            throws InterruptedException, ExecutionException {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            future.thenAccept(cleanupAction);
+            throw e;
+        }
+    }
 }

Reply via email to