This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 893fef98209 Fixed deadlock when checking topic ownership (#16310) 893fef98209 is described below commit 893fef982090b24b97ed8bab2581da7ed24d3513 Author: Matteo Merli <mme...@apache.org> AuthorDate: Sat Jul 2 17:02:07 2022 -0700 Fixed deadlock when checking topic ownership (#16310) * Fixed deadlock when checking topic ownership * Fixed mocked test * Fixed ServerCnxTest * Fixed testProducerOnNotOwnedTopic --- .../pulsar/broker/namespace/NamespaceService.java | 9 ++++++ .../pulsar/broker/service/BrokerService.java | 36 ++++++++++++++-------- .../pulsar/broker/service/PersistentTopicTest.java | 1 + .../pulsar/broker/service/ServerCnxTest.java | 4 ++- 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e50f82cfa9e..86fd5b58662 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1020,6 +1020,15 @@ public class NamespaceService implements AutoCloseable { } } + public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) { + Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); + if (!res.isPresent()) { + return CompletableFuture.completedFuture(false); + } + + return res.get().thenApply(ob -> ob != null && ob.isActive()); + } + private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a83da67eb1e..7f6c256a3e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1325,7 +1325,7 @@ public class BrokerService implements Closeable { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, createIfMissing, topicFuture, properties); + checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, properties); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1346,20 +1346,32 @@ public class BrokerService implements Closeable { return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, + private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture, Map<String, String> properties) { + TopicName topicName = TopicName.get(topic); + pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) + .thenAccept(isActive -> { + if (isActive) { + createPersistentTopic(topic, createIfMissing, topicFuture, properties); + } else { + // namespace is being unloaded + String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); + log.warn(msg); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }).exceptionally(ex -> { + topicFuture.completeExceptionally(ex); + return null; + }); + } - final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + private void createPersistentTopic(final String topic, boolean createIfMissing, + CompletableFuture<Optional<Topic>> topicFuture, + Map<String, String> properties) { TopicName topicName = TopicName.get(topic); - if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - return; - } + final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); @@ -2595,7 +2607,7 @@ public class BrokerService implements Closeable { CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getTopicFuture(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, true, pendingFuture, pendingTopic.getProperties()); + checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, pendingTopic.getProperties()); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index a9559ac96b9..a65f80e3fc7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -238,6 +238,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); doReturn(true).when(nsSvc).isServiceUnitActive(any()); + doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any()); setupMLAsyncCallbackMocks(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index dd86d6189d1..4a4f411ddb8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -214,6 +214,7 @@ public class ServerCnxTest { doReturn(namespaceService).when(pulsar).getNamespaceService(); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); doReturn(true).when(namespaceService).isServiceUnitActive(any()); + doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); @@ -489,7 +490,8 @@ public class ServerCnxTest { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class)); + doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) + .isServiceUnitActiveAsync(any(TopicName.class)); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,