This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 893fef98209 Fixed deadlock when checking topic ownership (#16310)
893fef98209 is described below

commit 893fef982090b24b97ed8bab2581da7ed24d3513
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Jul 2 17:02:07 2022 -0700

    Fixed deadlock when checking topic ownership (#16310)
    
    * Fixed deadlock when checking topic ownership
    
    * Fixed mocked test
    
    * Fixed ServerCnxTest
    
    * Fixed testProducerOnNotOwnedTopic
---
 .../pulsar/broker/namespace/NamespaceService.java  |  9 ++++++
 .../pulsar/broker/service/BrokerService.java       | 36 ++++++++++++++--------
 .../pulsar/broker/service/PersistentTopicTest.java |  1 +
 .../pulsar/broker/service/ServerCnxTest.java       |  4 ++-
 4 files changed, 37 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e50f82cfa9e..86fd5b58662 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1020,6 +1020,15 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName 
topicName) {
+        Optional<CompletableFuture<OwnedBundle>> res = 
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
+        if (!res.isPresent()) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        return res.get().thenApply(ob -> ob != null && ob.isActive());
+    }
+
     private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
         return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a83da67eb1e..7f6c256a3e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1325,7 +1325,7 @@ public class BrokerService implements Closeable {
                     final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
 
                     if (topicLoadSemaphore.tryAcquire()) {
-                        createPersistentTopic(topic, createIfMissing, 
topicFuture, properties);
+                        checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture, properties);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1346,20 +1346,32 @@ public class BrokerService implements Closeable {
         return topicFuture;
     }
 
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
+    private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
+        TopicName topicName = TopicName.get(topic);
+        pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
+                .thenAccept(isActive -> {
+                    if (isActive) {
+                        createPersistentTopic(topic, createIfMissing, 
topicFuture, properties);
+                    } else {
+                        // namespace is being unloaded
+                        String msg = String.format("Namespace is being 
unloaded, cannot add topic %s", topic);
+                        log.warn(msg);
+                        pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
+                        topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
+                    }
+                }).exceptionally(ex -> {
+                    topicFuture.completeExceptionally(ex);
+                    return null;
+                });
+    }
 
-        final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
+                                       CompletableFuture<Optional<Topic>> 
topicFuture,
+                                       Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
-        if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) {
-            // namespace is being unloaded
-            String msg = String.format("Namespace is being unloaded, cannot 
add topic %s", topic);
-            log.warn(msg);
-            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
-            topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
-            return;
-        }
+        final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
         if (isTransactionInternalName(topicName)) {
             String msg = String.format("Can not create transaction system 
topic %s", topic);
@@ -2595,7 +2607,7 @@ public class BrokerService implements Closeable {
             CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getTopicFuture();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
-            createPersistentTopic(topic, true, pendingFuture, 
pendingTopic.getProperties());
+            checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, 
pendingTopic.getProperties());
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a9559ac96b9..a65f80e3fc7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -238,6 +238,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(nsSvc).when(pulsar).getNamespaceService();
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());
 
         setupMLAsyncCallbackMocks();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index dd86d6189d1..4a4f411ddb8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -214,6 +214,7 @@ public class ServerCnxTest {
         doReturn(namespaceService).when(pulsar).getNamespaceService();
         doReturn(true).when(namespaceService).isServiceUnitOwned(any());
         doReturn(true).when(namespaceService).isServiceUnitActive(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
@@ -489,7 +490,8 @@ public class ServerCnxTest {
         setChannelConnected();
 
         // Force the case where the broker doesn't own any topic
-        
doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class));
+        
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
+                .isServiceUnitActiveAsync(any(TopicName.class));
 
         // test PRODUCER failure case
         ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* 
producer id */, 1 /* request id */,

Reply via email to