This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new 1038a745c35 Fixed deadlock when checking topic ownership (#16310) 1038a745c35 is described below commit 1038a745c352e1adda1e0b9f62d5fab23dce6b17 Author: Matteo Merli <mme...@apache.org> AuthorDate: Sat Jul 2 17:02:07 2022 -0700 Fixed deadlock when checking topic ownership (#16310) * Fixed deadlock when checking topic ownership * Fixed mocked test * Fixed ServerCnxTest * Fixed testProducerOnNotOwnedTopic --- .../pulsar/broker/namespace/NamespaceService.java | 9 ++++++ .../pulsar/broker/service/BrokerService.java | 33 ++++++++++++++-------- .../pulsar/broker/service/PersistentTopicTest.java | 1 + .../pulsar/broker/service/ServerCnxTest.java | 4 ++- 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4d910cb901d..fa4cd16ee11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1008,6 +1008,15 @@ public class NamespaceService implements AutoCloseable { } } + public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) { + Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); + if (!res.isPresent()) { + return CompletableFuture.completedFuture(false); + } + + return res.get().thenApply(ob -> ob != null && ob.isActive()); + } + private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 24ef20452c0..bc6dbb73750 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1279,7 +1279,7 @@ public class BrokerService implements Closeable { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, createIfMissing, topicFuture); + checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1300,19 +1300,30 @@ public class BrokerService implements Closeable { return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, + private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) { + TopicName topicName = TopicName.get(topic); + pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) + .thenAccept(isActive -> { + if (isActive) { + createPersistentTopic(topic, createIfMissing, topicFuture); + } else { + // namespace is being unloaded + String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); + log.warn(msg); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }).exceptionally(ex -> { + topicFuture.completeExceptionally(ex); + return null; + }); + } + private void createPersistentTopic(final String topic, boolean createIfMissing, + CompletableFuture<Optional<Topic>> topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); TopicName topicName = TopicName.get(topic); - if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - return; - } if (isTransactionSystemTopic(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); @@ -2386,7 +2397,7 @@ public class BrokerService implements Closeable { CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, true, pendingFuture); + checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 26004251399..3e527e8135a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -229,6 +229,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); doReturn(true).when(nsSvc).isServiceUnitActive(any()); + doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any()); setupMLAsyncCallbackMocks(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 5615a28a457..be9ed72cd14 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -198,6 +198,7 @@ public class ServerCnxTest { doReturn(namespaceService).when(pulsar).getNamespaceService(); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); doReturn(true).when(namespaceService).isServiceUnitActive(any()); + doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); setupMLAsyncCallbackMocks(); @@ -463,7 +464,8 @@ public class ServerCnxTest { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class)); + doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) + .isServiceUnitActiveAsync(any(TopicName.class)); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,