This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 5dc57dc9304 Fixed deadlock when checking topic ownership (#16310) 5dc57dc9304 is described below commit 5dc57dc9304fa18dd6d6eb6da84745dde49a5818 Author: Matteo Merli <mme...@apache.org> AuthorDate: Sat Jul 2 17:02:07 2022 -0700 Fixed deadlock when checking topic ownership (#16310) * Fixed deadlock when checking topic ownership * Fixed mocked test * Fixed ServerCnxTest * Fixed testProducerOnNotOwnedTopic --- .../pulsar/broker/namespace/NamespaceService.java | 9 ++++++ .../pulsar/broker/service/BrokerService.java | 36 ++++++++++++++-------- .../pulsar/broker/service/PersistentTopicTest.java | 1 + .../pulsar/broker/service/ServerCnxTest.java | 4 ++- 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 9cc427aae67..5912dca1339 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1014,6 +1014,15 @@ public class NamespaceService implements AutoCloseable { } } + public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) { + Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); + if (!res.isPresent()) { + return CompletableFuture.completedFuture(false); + } + + return res.get().thenApply(ob -> ob != null && ob.isActive()); + } + private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7b442698e92..09601033ca3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1348,7 +1348,7 @@ public class BrokerService implements Closeable { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, createIfMissing, topicFuture, properties); + checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, properties); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1369,20 +1369,32 @@ public class BrokerService implements Closeable { return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, + private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture, Map<String, String> properties) { + TopicName topicName = TopicName.get(topic); + pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) + .thenAccept(isActive -> { + if (isActive) { + createPersistentTopic(topic, createIfMissing, topicFuture, properties); + } else { + // namespace is being unloaded + String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); + log.warn(msg); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }).exceptionally(ex -> { + topicFuture.completeExceptionally(ex); + return null; + }); + } - final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + private void createPersistentTopic(final String topic, boolean createIfMissing, + CompletableFuture<Optional<Topic>> topicFuture, + Map<String, String> properties) { TopicName topicName = TopicName.get(topic); - if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - return; - } + final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); if (isTransactionSystemTopic(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); @@ -2496,7 +2508,7 @@ public class BrokerService implements Closeable { CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getTopicFuture(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, true, pendingFuture, pendingTopic.getProperties()); + checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, pendingTopic.getProperties()); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index e376ec990fd..811a300342a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -228,6 +228,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); doReturn(true).when(nsSvc).isServiceUnitActive(any()); + doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any()); setupMLAsyncCallbackMocks(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 0cdc8184afd..12a4f0db18d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -198,6 +198,7 @@ public class ServerCnxTest { doReturn(namespaceService).when(pulsar).getNamespaceService(); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); doReturn(true).when(namespaceService).isServiceUnitActive(any()); + doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); setupMLAsyncCallbackMocks(); @@ -469,7 +470,8 @@ public class ServerCnxTest { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class)); + doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) + .isServiceUnitActiveAsync(any(TopicName.class)); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,