This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 92a19301a16b6879f43168fbd6fb25666e7e5b06
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jun 5 10:02:53 2025 +0800

    [fix][broker]Fix deadlock when compaction and topic deletion execute 
concurrently (#24366)
    
    (cherry picked from commit 37e160f78f1504d3cd45523a08872f8f7d461df7)
---
 .../pulsar/broker/service/AbstractTopic.java       |  3 +
 .../broker/service/persistent/PersistentTopic.java |  1 +
 .../org/apache/pulsar/client/api/RawReader.java    | 11 ++--
 .../apache/pulsar/client/impl/RawReaderImpl.java   | 23 ++++++--
 .../org/apache/pulsar/compaction/Compactor.java    |  2 +-
 .../pulsar/client/impl/ConsumerCloseTest.java      | 42 +++++++++++++-
 .../apache/pulsar/client/impl/RawReaderTest.java   | 66 ++++++++++++++++++++-
 .../apache/pulsar/compaction/CompactionTest.java   | 67 ++++++++++++++++++++++
 .../pulsar/client/api/PulsarClientException.java   | 11 ++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 +-
 .../pulsar/client/impl/ConnectionHandler.java      | 22 +++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 56 +++++++++++++-----
 .../apache/pulsar/client/impl/ProducerImpl.java    | 39 ++++++++++---
 .../pulsar/client/impl/TopicListWatcher.java       |  4 +-
 .../client/impl/TransactionMetaStoreHandler.java   |  4 +-
 15 files changed, 309 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 20b2abfa48e..6d3c9a58b03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.time.Clock;
 import java.util.ArrayList;
@@ -114,6 +115,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
 
     protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    @VisibleForTesting
+    @Getter
     protected volatile boolean isFenced;
 
     protected final HierarchyTopicPolicies topicPolicies;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e147796d5a6..31024f6976c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -270,6 +270,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private volatile double lastUpdatedAvgPublishRateInMsg = 0;
     private volatile double lastUpdatedAvgPublishRateInByte = 0;
 
+    @Getter
     private volatile boolean isClosingOrDeleting = false;
 
     private ScheduledFuture<?> fencedTopicMonitoringTask = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 55483708fdf..ae4927da5ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -33,23 +33,24 @@ public interface RawReader {
      */
 
     static CompletableFuture<RawReader> create(PulsarClient client, String 
topic, String subscription) {
-        return create(client, topic, subscription, true);
+        return create(client, topic, subscription, true, true);
     }
 
     static CompletableFuture<RawReader> create(PulsarClient client, String 
topic, String subscription,
-                                               boolean 
createTopicIfDoesNotExist) {
+                                               boolean 
createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
         CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
         RawReader r =
-                new RawReaderImpl((PulsarClientImpl) client, topic, 
subscription, future, createTopicIfDoesNotExist);
+                new RawReaderImpl((PulsarClientImpl) client, topic, 
subscription, future, createTopicIfDoesNotExist,
+                        retryOnRecoverableErrors);
         return future.thenApply(__ -> r);
     }
 
     static CompletableFuture<RawReader> create(PulsarClient client,
                                                
ConsumerConfigurationData<byte[]> consumerConfiguration,
-                                               boolean 
createTopicIfDoesNotExist) {
+                                               boolean 
createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
         CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
         RawReader r = new RawReaderImpl((PulsarClientImpl) client,
-                consumerConfiguration, future, createTopicIfDoesNotExist);
+                consumerConfiguration, future, createTopicIfDoesNotExist, 
retryOnRecoverableErrors);
         return future.thenApply(__ -> r);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 5ac051d2271..32f75d71dc3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
@@ -52,7 +53,7 @@ public class RawReaderImpl implements RawReader {
 
     public RawReaderImpl(PulsarClientImpl client, String topic, String 
subscription,
                          CompletableFuture<Consumer<byte[]>> consumerFuture,
-                         boolean createTopicIfDoesNotExist) {
+                         boolean createTopicIfDoesNotExist, boolean 
retryOnRecoverableErrors) {
         consumerConfiguration = new ConsumerConfigurationData<>();
         consumerConfiguration.getTopicNames().add(topic);
         consumerConfiguration.setSubscriptionName(subscription);
@@ -62,14 +63,16 @@ public class RawReaderImpl implements RawReader {
         
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
         consumerConfiguration.setAckReceiptEnabled(true);
 
-        consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
+        consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist,
+                retryOnRecoverableErrors);
     }
 
     public RawReaderImpl(PulsarClientImpl client, 
ConsumerConfigurationData<byte[]> consumerConfiguration,
                          CompletableFuture<Consumer<byte[]>> consumerFuture,
-                         boolean createTopicIfDoesNotExist) {
+                         boolean createTopicIfDoesNotExist, boolean 
retryOnRecoverableErrors) {
         this.consumerConfiguration = consumerConfiguration;
-        consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
+        consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist,
+                retryOnRecoverableErrors);
     }
 
 
@@ -117,9 +120,11 @@ public class RawReaderImpl implements RawReader {
     static class RawConsumerImpl extends ConsumerImpl<byte[]> {
         final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
+        final boolean retryOnRecoverableErrors;
 
         RawConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<byte[]> conf,
-                CompletableFuture<Consumer<byte[]>> consumerFuture, boolean 
createTopicIfDoesNotExist) {
+                CompletableFuture<Consumer<byte[]>> consumerFuture, boolean 
createTopicIfDoesNotExist,
+                boolean retryOnRecoverableErrors) {
             super(client,
                     conf.getSingleTopic(),
                     conf,
@@ -135,6 +140,14 @@ public class RawReaderImpl implements RawReader {
             );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
+            this.retryOnRecoverableErrors = retryOnRecoverableErrors;
+        }
+
+        protected boolean isUnrecoverableError(Throwable t) {
+            if (!retryOnRecoverableErrors && (t instanceof 
PulsarClientException.ServiceNotReadyException)) {
+                return true;
+            }
+            return super.isUnrecoverableError(t);
         }
 
         void tryCompletePending() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 983443432ff..d37298757db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -56,7 +56,7 @@ public abstract class Compactor {
     }
 
     public CompletableFuture<Long> compact(String topic) {
-        return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, 
false).thenComposeAsync(
+        return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, 
false).thenComposeAsync(
                 this::compactAndCloseReader, scheduler);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
index 3a446101990..66fff46b660 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
@@ -18,14 +18,19 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -76,7 +81,7 @@ public class ConsumerCloseTest extends ProducerConsumerBase {
                         .subscribe();
                 Assert.fail("Should have thrown an exception");
             } catch (PulsarClientException e) {
-                Assert.assertTrue(e.getCause() instanceof 
InterruptedException);
+                assertTrue(e.getCause() instanceof InterruptedException);
             }
         });
         startConsumer.start();
@@ -88,4 +93,39 @@ public class ConsumerCloseTest extends ProducerConsumerBase {
             Assert.assertEquals(clientImpl.consumersCount(), 0);
         });
     }
+
+    @Test
+    public void testReceiveWillDoneAfterClosedConsumer() throws Exception {
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String subName = "test-sub";
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subName, MessageId.earliest);
+        ConsumerImpl<byte[]> consumer =
+                (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(tpName).subscriptionName(subName).subscribe();
+        CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+        consumer.close();
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(future.isDone());
+        });
+    }
+
+    @Test
+    public void testReceiveWillDoneAfterTopicDeleted() throws Exception {
+        String namespace = "public/default";
+        admin.namespaces().setAutoTopicCreation(namespace, 
AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(false).build());
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String subName = "test-sub";
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subName, MessageId.earliest);
+        ConsumerImpl<byte[]> consumer =
+                (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(tpName).subscriptionName(subName).subscribe();
+        CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+        admin.topics().delete(tpName, true);
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(future.isDone());
+        });
+        // cleanup.
+        admin.namespaces().removeAutoTopicCreation(namespace);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index d9ddc00b2e8..a88d590f202 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -27,12 +27,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -44,6 +49,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -54,12 +60,17 @@ import 
org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 @Test(groups = "broker-impl")
 @Slf4j
@@ -215,7 +226,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         consumerConfiguration.setReadCompacted(true);
         
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
         consumerConfiguration.setAckReceiptEnabled(true);
-        RawReader reader = RawReader.create(pulsarClient, 
consumerConfiguration, true).get();
+        RawReader reader = RawReader.create(pulsarClient, 
consumerConfiguration, true, true).get();
 
         MessageId lastMessageId = reader.getLastMessageIdAsync().get();
         while (true) {
@@ -547,11 +558,62 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
 
         String topic2 = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
         try {
-            reader = RawReader.create(pulsarClient, topic2, subscription, 
false).get();
+            reader = RawReader.create(pulsarClient, topic2, subscription, 
false, true).get();
             Assert.fail();
         } catch (Exception e) {
             Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.TopicDoesNotExistException);
         }
         reader.closeAsync().join();
     }
+
+    @Test(timeOut = 60000)
+    public void testReconnectsWhenServiceNotReady() throws Exception {
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
+        String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+
+        // Inject a delay event for topic close, which leads to that the 
raw-reader will get a ServiceNotReady error,
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).get().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursor compactionCursor = ml.openCursor(subscriptionName);
+        ManagedCursor spyCompactionCursor = spy(compactionCursor);
+        CountDownLatch delayCloseCursorSignal = new CountDownLatch(1);
+        Answer answer = new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                delayCloseCursorSignal.await();
+                return invocationOnMock.callRealMethod();
+            }
+        };
+        
doAnswer(answer).when(spyCompactionCursor).asyncClose(any(AsyncCallbacks.CloseCallback.class),
 any());
+        ml.getCursors().removeCursor(subscriptionName);
+        ml.getCursors().add(spyCompactionCursor, ml.getLastConfirmedEntry());
+
+        // Unload topic after reader is connected.
+        // The topic state comes to "fenced", then RawReader will get a 
ServiceNotReady error,
+        CompletableFuture<RawMessage> msgFuture = reader.readNextAsync();
+        CompletableFuture<Void> unloadFuture = 
admin.topics().unloadAsync(topic);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(persistentTopic.isFenced());
+        });
+
+        // Verify: RasReader reconnected after that the unloading is finished, 
and it can consume successfully.
+        delayCloseCursorSignal.countDown();
+        unloadFuture.get();
+        MessageIdImpl msgIdSent = (MessageIdImpl) producer.send("msg");
+        RawMessage rawMessage = msgFuture.get();
+        Assert.assertNotNull(rawMessage);
+        MessageIdImpl msgIdReceived = (MessageIdImpl) 
rawMessage.getMessageId();
+        Assert.assertEquals(msgIdSent.getLedgerId(), 
msgIdReceived.getLedgerId());
+        Assert.assertEquals(msgIdSent.getEntryId(), 
msgIdReceived.getEntryId());
+
+        // cleanup.
+        rawMessage.close();;
+        producer.close();
+        reader.closeAsync().get();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index ff9026dbba6..f43e2a1c672 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.compaction;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -60,9 +64,11 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -101,6 +107,8 @@ import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -2320,6 +2328,65 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
     }
 
+    @Test(timeOut = 120 * 1000)
+    public void testConcurrentCompactionAndTopicDelete() throws Exception {
+        final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        // Load up the topic.
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+        // Inject a reading delay to the compaction task,
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursor compactionCursor = 
ml.openCursor(COMPACTION_SUBSCRIPTION);
+        ManagedCursor spyCompactionCursor = spy(compactionCursor);
+        CountDownLatch delayReadSignal = new CountDownLatch(1);
+        Answer answer = new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                delayReadSignal.await();
+                return invocationOnMock.callRealMethod();
+            }
+        };
+        doAnswer(answer).when(spyCompactionCursor).asyncReadEntries(anyInt(),
+                any(AsyncCallbacks.ReadEntriesCallback.class), any(), 
any(Position.class));
+        doAnswer(answer).when(spyCompactionCursor).asyncReadEntries(anyInt(), 
anyLong(),
+                any(AsyncCallbacks.ReadEntriesCallback.class), any(), 
any(Position.class));
+        
doAnswer(answer).when(spyCompactionCursor).asyncReadEntriesOrWait(anyInt(), 
anyLong(),
+                any(AsyncCallbacks.ReadEntriesCallback.class), any(), 
any(Position.class));
+        ml.getCursors().removeCursor(COMPACTION_SUBSCRIPTION);
+        ml.getCursors().add(spyCompactionCursor, ml.getLastConfirmedEntry());
+
+        // Trigger a compaction task.
+        for (int i = 0; i < 2000; i++) {
+            
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+        }
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).readCompacted(true).subscriptionName("s1")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() -> {
+           
assertEquals(persistentTopic.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(),
 1);
+        });
+
+        // Since we injected a delay reading, the compaction task started and 
not finish yet.
+        // Call topic deletion, they two tasks are concurrently executed.
+        producer.close();
+        consumer.close();
+        CompletableFuture<Void> deleteTopicFuture = 
persistentTopic.deleteForcefully();
+
+        // Remove the injection after 3s.
+        Thread.sleep(3000);
+        delayReadSignal.countDown();
+
+        // Verify: topic deletion is successfully executed.
+        Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertTrue(deleteTopicFuture.isDone());
+        });
+    }
+
     @Test
     public void testEarliestSubsAfterRollover() throws Exception {
         final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/testEarliestSubsAfterRollover");
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b2c9b2b697b..bf32014af7a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -362,6 +362,17 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    /**
+     * Relates to server-side errors:
+     *  ServiceUnitNotReadyException, TopicFencedException and 
SubscriptionFencedException.
+     */
+    public static class ServiceNotReadyException extends LookupException {
+
+        public ServiceNotReadyException(String msg) {
+            super(msg);
+        }
+    }
+
     /**
      * Connect exception thrown by Pulsar client.
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 24fd0034eb0..0563fa7e666 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1353,7 +1353,7 @@ public class ClientCnx extends PulsarHandler {
         case PersistenceError:
             return new 
PulsarClientException.BrokerPersistenceException(errorMsg);
         case ServiceNotReady:
-            return new PulsarClientException.LookupException(errorMsg);
+            return new 
PulsarClientException.ServiceNotReadyException(errorMsg);
         case TooManyRequests:
             return new 
PulsarClientException.TooManyRequestsException(errorMsg);
         case ProducerBlockedQuotaExceededError:
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index fb5b6788d08..a4e262b35ae 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -64,7 +64,15 @@ public class ConnectionHandler {
          * @apiNote If the returned future is completed exceptionally, 
reconnectLater will be called.
          */
         CompletableFuture<Void> connectionOpened(ClientCnx cnx);
-        default void connectionFailed(PulsarClientException e) {
+
+        /**
+         *
+         * @param e What error happened when tries to get a connection
+         * @return If "true", the connection handler will retry to get a 
connection, otherwise, it stops to get a new
+         * connection. If it returns "false", you should release resources 
that consumers/producers occupied.
+         */
+        default boolean connectionFailed(PulsarClientException e) {
+            return true;
         }
     }
 
@@ -142,22 +150,24 @@ public class ConnectionHandler {
     }
 
     private Void handleConnectionError(Throwable exception) {
+        boolean toRetry = true;
         try {
             log.warn("[{}] [{}] Error connecting to broker: {}",
                     state.topic, state.getHandlerName(), 
exception.getMessage());
             if (exception instanceof PulsarClientException) {
-                connection.connectionFailed((PulsarClientException) exception);
+                toRetry = connection.connectionFailed((PulsarClientException) 
exception);
             } else if (exception.getCause() instanceof PulsarClientException) {
-                connection.connectionFailed((PulsarClientException) 
exception.getCause());
+                toRetry = connection.connectionFailed((PulsarClientException) 
exception.getCause());
             } else {
-                connection.connectionFailed(new 
PulsarClientException(exception));
+                toRetry = connection.connectionFailed(new 
PulsarClientException(exception));
             }
         } catch (Throwable throwable) {
             log.error("[{}] [{}] Unexpected exception after the connection",
                     state.topic, state.getHandlerName(), throwable);
         }
-
-        reconnectLater(exception);
+        if (toRetry) {
+            reconnectLater(exception);
+        }
         return null;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 275966c26b3..f575a127c96 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -974,6 +974,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
                 if (e.getCause() instanceof PulsarClientException
                         && PulsarClientException.isRetriableError(e.getCause())
+                        && !isUnrecoverableError(e.getCause())
                         && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
                     future.completeExceptionally(e.getCause());
                 } else if (!subscribeFuture.isDone()) {
@@ -985,18 +986,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                             + "with subscription name %s when 
connecting to the broker",
                                     topicName.toString(), subscription)));
                     client.cleanupConsumer(this);
-                } else if (e.getCause() instanceof TopicDoesNotExistException) 
{
-                    // The topic was deleted after the consumer was created, 
and we're
-                    // not allowed to recreate the topic. This can happen in 
few cases:
-                    //  * Regex consumer getting error after topic gets deleted
-                    //  * Regular consumer after topic is manually delete and 
with
-                    //    auto-topic-creation set to false
-                    // No more retries are needed in this case.
-                    setState(State.Failed);
-                    closeConsumerTasks();
-                    client.cleanupConsumer(this);
-                    log.warn("[{}][{}] Closed consumer because topic does not 
exist anymore {}",
-                            topic, subscription, 
cnx.channel().remoteAddress());
+                } else if (isUnrecoverableError(e.getCause())) {
+                    closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
                 } else {
                     // consumer was subscribed and connected but we got some 
error, keep trying
                     future.completeExceptionally(e.getCause());
@@ -1011,6 +1002,37 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return future;
     }
 
+    /***
+     * Different consumer implementation can define its additional 
unrecoverable error.
+     */
+    protected boolean isUnrecoverableError(Throwable t) {
+        // TopicDoesNotExistException: topic has been deleted.
+        // NotFoundException: topic has been deleted.
+        // IllegalStateException: consumer has been closed.
+        return (t instanceof TopicDoesNotExistException) || (t instanceof 
IllegalStateException)
+                || (t instanceof PulsarClientException.NotFoundException);
+    }
+
+    protected void closeWhenReceivedUnrecoverableError(Throwable t, ClientCnx 
cnx) {
+        // The topic was deleted after the consumer was created, and we're
+        // not allowed to recreate the topic. This can happen in few cases:
+        //  * Regex consumer getting error after topic gets deleted
+        //  * Regular consumer after topic is manually delete and with
+        //    auto-topic-creation set to false
+        // No more retries are needed in this case.
+        final String cnxStr = cnx == null ? "null" : 
String.valueOf(cnx.channel().remoteAddress());
+        log.warn("[{}][{}] {} Closed consumer because get an error that does 
not support to retry: {} {}",
+                topic, subscription, cnxStr, t.getClass().getName(), 
t.getMessage());
+        closeAsync().whenComplete((__, ex) -> {
+            if (ex == null) {
+                setState(State.Failed);
+                return;
+            }
+            log.error("[{}][{}] {} Failed to close consumer after got an error 
that does not support to retry: {} {}",
+                topic, subscription, cnxStr, t.getClass().getName(), 
t.getMessage());
+        });
+    }
+
     protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
         log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, 
subscription,
                 cnx.channel().remoteAddress(), consumerId);
@@ -1091,7 +1113,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public void connectionFailed(PulsarClientException exception) {
+    public boolean connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
         boolean timeout = System.currentTimeMillis() > lookupDeadline;
         if (nonRetriableError || timeout) {
@@ -1107,10 +1129,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 closeConsumerTasks();
                 deregisterFromClientCnx();
                 client.cleanupConsumer(this);
+                return false;
+            } else {
+                Throwable actError = 
FutureUtil.unwrapCompletionException(exception);
+                if (isUnrecoverableError(actError)) {
+                    closeWhenReceivedUnrecoverableError(actError, null);
+                    return false;
+                }
             }
         } else {
             previousExceptionCount.incrementAndGet();
         }
+        return true;
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 50a6f9aa81b..e9decbfa0f5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1965,13 +1965,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             }
 
             // Close the producer since topic does not exist.
-            if (cause instanceof 
PulsarClientException.TopicDoesNotExistException) {
-                closeAsync().whenComplete((v, ex) -> {
-                    if (ex != null) {
-                        log.error("Failed to close producer on 
TopicDoesNotExistException.", ex);
-                    }
-                    producerCreatedFuture.completeExceptionally(cause);
-                });
+            if (isUnrecoverableError(cause)) {
+                closeWhenReceivedUnrecoverableError(cause, cnx);
                 future.complete(null);
                 return null;
             }
@@ -2043,8 +2038,28 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return future;
     }
 
+    protected boolean isUnrecoverableError(Throwable t) {
+        // TopicDoesNotExistException: topic has been deleted.
+        // NotFoundException: topic has been deleted.
+        // IllegalStateException: producer has been closed.
+        return (t instanceof PulsarClientException.TopicDoesNotExistException) 
|| (t instanceof IllegalStateException)
+                || (t instanceof PulsarClientException.NotFoundException);
+    }
+
+    protected void closeWhenReceivedUnrecoverableError(Throwable t, ClientCnx 
cnx) {
+        final String cnxStr = cnx == null ? "null" : 
String.valueOf(cnx.channel().remoteAddress());
+        log.warn("[{}][{}] {} Closed producer because get an error that does 
not support to retry: {} {}",
+                topic, producerName, cnxStr, t.getClass().getName(), 
t.getMessage());
+        closeAsync().whenComplete((v, ex) -> {
+            if (ex != null) {
+                log.error("Failed to close producer on 
TopicDoesNotExistException.", ex);
+            }
+            producerCreatedFuture.completeExceptionally(t);
+        });
+    }
+
     @Override
-    public void connectionFailed(PulsarClientException exception) {
+    public boolean connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
         boolean timeout = System.currentTimeMillis() > lookupDeadline;
         if (nonRetriableError || timeout) {
@@ -2059,10 +2074,18 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 closeProducerTasks();
                 setState(State.Failed);
                 client.cleanupProducer(this);
+                return false;
+            } else {
+                Throwable actError = 
FutureUtil.unwrapCompletionException(exception);
+                if (isUnrecoverableError(actError)) {
+                    closeWhenReceivedUnrecoverableError(actError, null);
+                    return false;
+                }
             }
         } else {
             previousExceptionCount.incrementAndGet();
         }
+        return true;
     }
 
     private void closeProducerTasks() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 93fa7082f33..5357b154622 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -89,7 +89,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
     }
 
     @Override
-    public void connectionFailed(PulsarClientException exception) {
+    public boolean connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
         if (nonRetriableError) {
             exception.setPreviousExceptionCount(previousExceptionCount);
@@ -98,10 +98,12 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                 log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}",
                         topic, name, exception.getMessage());
                 deregisterFromClientCnx();
+                return false;
             }
         } else {
             previousExceptionCount.incrementAndGet();
         }
+        return true;
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index a1fe78d7290..4ca742d98ea 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -122,7 +122,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     @Override
-    public void connectionFailed(PulsarClientException exception) {
+    public boolean connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
         boolean timeout = System.currentTimeMillis() > lookupDeadline;
         if (nonRetriableError || timeout) {
@@ -136,10 +136,12 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                             + "timeout", transactionCoordinatorId, exception);
                 }
                 setState(State.Failed);
+                return false;
             }
         } else {
             previousExceptionCount.getAndIncrement();
         }
+        return true;
     }
 
     @Override


Reply via email to