This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 5dc57dc9304 Fixed deadlock when checking topic ownership (#16310)
5dc57dc9304 is described below

commit 5dc57dc9304fa18dd6d6eb6da84745dde49a5818
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Jul 2 17:02:07 2022 -0700

    Fixed deadlock when checking topic ownership (#16310)
    
    * Fixed deadlock when checking topic ownership
    
    * Fixed mocked test
    
    * Fixed ServerCnxTest
    
    * Fixed testProducerOnNotOwnedTopic
---
 .../pulsar/broker/namespace/NamespaceService.java  |  9 ++++++
 .../pulsar/broker/service/BrokerService.java       | 36 ++++++++++++++--------
 .../pulsar/broker/service/PersistentTopicTest.java |  1 +
 .../pulsar/broker/service/ServerCnxTest.java       |  4 ++-
 4 files changed, 37 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9cc427aae67..5912dca1339 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1014,6 +1014,15 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName 
topicName) {
+        Optional<CompletableFuture<OwnedBundle>> res = 
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
+        if (!res.isPresent()) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        return res.get().thenApply(ob -> ob != null && ob.isActive());
+    }
+
     private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
         return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7b442698e92..09601033ca3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1348,7 +1348,7 @@ public class BrokerService implements Closeable {
                     final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
 
                     if (topicLoadSemaphore.tryAcquire()) {
-                        createPersistentTopic(topic, createIfMissing, 
topicFuture, properties);
+                        checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture, properties);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1369,20 +1369,32 @@ public class BrokerService implements Closeable {
         return topicFuture;
     }
 
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
+    private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
+        TopicName topicName = TopicName.get(topic);
+        pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
+                .thenAccept(isActive -> {
+                    if (isActive) {
+                        createPersistentTopic(topic, createIfMissing, 
topicFuture, properties);
+                    } else {
+                        // namespace is being unloaded
+                        String msg = String.format("Namespace is being 
unloaded, cannot add topic %s", topic);
+                        log.warn(msg);
+                        pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
+                        topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
+                    }
+                }).exceptionally(ex -> {
+                    topicFuture.completeExceptionally(ex);
+                    return null;
+                });
+    }
 
-        final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
+                                       CompletableFuture<Optional<Topic>> 
topicFuture,
+                                       Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
-        if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) {
-            // namespace is being unloaded
-            String msg = String.format("Namespace is being unloaded, cannot 
add topic %s", topic);
-            log.warn(msg);
-            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
-            topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
-            return;
-        }
+        final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
         if (isTransactionSystemTopic(topicName)) {
             String msg = String.format("Can not create transaction system 
topic %s", topic);
@@ -2496,7 +2508,7 @@ public class BrokerService implements Closeable {
             CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getTopicFuture();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
-            createPersistentTopic(topic, true, pendingFuture, 
pendingTopic.getProperties());
+            checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, 
pendingTopic.getProperties());
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index e376ec990fd..811a300342a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -228,6 +228,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(nsSvc).when(pulsar).getNamespaceService();
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());
 
         setupMLAsyncCallbackMocks();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0cdc8184afd..12a4f0db18d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -198,6 +198,7 @@ public class ServerCnxTest {
         doReturn(namespaceService).when(pulsar).getNamespaceService();
         doReturn(true).when(namespaceService).isServiceUnitOwned(any());
         doReturn(true).when(namespaceService).isServiceUnitActive(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
 
         setupMLAsyncCallbackMocks();
@@ -469,7 +470,8 @@ public class ServerCnxTest {
         setChannelConnected();
 
         // Force the case where the broker doesn't own any topic
-        
doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class));
+        
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
+                .isServiceUnitActiveAsync(any(TopicName.class));
 
         // test PRODUCER failure case
         ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* 
producer id */, 1 /* request id */,

Reply via email to