This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 1038a745c35 Fixed deadlock when checking topic ownership (#16310)
1038a745c35 is described below

commit 1038a745c352e1adda1e0b9f62d5fab23dce6b17
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Jul 2 17:02:07 2022 -0700

    Fixed deadlock when checking topic ownership (#16310)
    
    * Fixed deadlock when checking topic ownership
    
    * Fixed mocked test
    
    * Fixed ServerCnxTest
    
    * Fixed testProducerOnNotOwnedTopic
---
 .../pulsar/broker/namespace/NamespaceService.java  |  9 ++++++
 .../pulsar/broker/service/BrokerService.java       | 33 ++++++++++++++--------
 .../pulsar/broker/service/PersistentTopicTest.java |  1 +
 .../pulsar/broker/service/ServerCnxTest.java       |  4 ++-
 4 files changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4d910cb901d..fa4cd16ee11 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1008,6 +1008,15 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName 
topicName) {
+        Optional<CompletableFuture<OwnedBundle>> res = 
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
+        if (!res.isPresent()) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        return res.get().thenApply(ob -> ob != null && ob.isActive());
+    }
+
     private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
         return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 24ef20452c0..bc6dbb73750 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1279,7 +1279,7 @@ public class BrokerService implements Closeable {
                     final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
 
                     if (topicLoadSemaphore.tryAcquire()) {
-                        createPersistentTopic(topic, createIfMissing, 
topicFuture);
+                        checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1300,19 +1300,30 @@ public class BrokerService implements Closeable {
         return topicFuture;
     }
 
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
+    private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture) {
+        TopicName topicName = TopicName.get(topic);
+        pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
+                .thenAccept(isActive -> {
+                    if (isActive) {
+                        createPersistentTopic(topic, createIfMissing, 
topicFuture);
+                    } else {
+                        // namespace is being unloaded
+                        String msg = String.format("Namespace is being 
unloaded, cannot add topic %s", topic);
+                        log.warn(msg);
+                        pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
+                        topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
+                    }
+                }).exceptionally(ex -> {
+                    topicFuture.completeExceptionally(ex);
+                    return null;
+                });
+    }
 
+    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
+                                       CompletableFuture<Optional<Topic>> 
topicFuture) {
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         TopicName topicName = TopicName.get(topic);
-        if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) {
-            // namespace is being unloaded
-            String msg = String.format("Namespace is being unloaded, cannot 
add topic %s", topic);
-            log.warn(msg);
-            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
-            topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
-            return;
-        }
 
         if (isTransactionSystemTopic(topicName)) {
             String msg = String.format("Can not create transaction system 
topic %s", topic);
@@ -2386,7 +2397,7 @@ public class BrokerService implements Closeable {
             CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getRight();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
-            createPersistentTopic(topic, true, pendingFuture);
+            checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture);
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 26004251399..3e527e8135a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -229,6 +229,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(nsSvc).when(pulsar).getNamespaceService();
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());
 
         setupMLAsyncCallbackMocks();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5615a28a457..be9ed72cd14 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -198,6 +198,7 @@ public class ServerCnxTest {
         doReturn(namespaceService).when(pulsar).getNamespaceService();
         doReturn(true).when(namespaceService).isServiceUnitOwned(any());
         doReturn(true).when(namespaceService).isServiceUnitActive(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
 
         setupMLAsyncCallbackMocks();
@@ -463,7 +464,8 @@ public class ServerCnxTest {
         setChannelConnected();
 
         // Force the case where the broker doesn't own any topic
-        
doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class));
+        
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
+                .isServiceUnitActiveAsync(any(TopicName.class));
 
         // test PRODUCER failure case
         ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* 
producer id */, 1 /* request id */,

Reply via email to