This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 05d88f71e179184f5862a3ee4990f0b31b751468
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Sep 25 17:02:53 2025 +0800

    [improve][broker] Replace isServiceUnitActiveAsync with 
checkTopicNsOwnership (#24780)
    
    (cherry picked from commit 46a76e98d6a6f2a86839d93467c5337bb205f851)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 31 ------------
 .../pulsar/broker/service/BrokerService.java       | 55 +++++++++-------------
 .../PersistentDispatcherFailoverConsumerTest.java  |  2 -
 .../service/PersistentTopicConcurrentTest.java     |  2 -
 .../pulsar/broker/service/PersistentTopicTest.java |  2 -
 .../pulsar/broker/service/ServerCnxTest.java       |  6 +--
 .../client/api/OrphanPersistentTopicTest.java      |  6 +--
 7 files changed, 28 insertions(+), 76 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a580c7500b7..b5dbddc2ea3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -43,9 +43,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -1225,35 +1223,6 @@ public class NamespaceService implements AutoCloseable {
                 new IllegalArgumentException("Invalid class of 
NamespaceBundle: " + suName.getClass().getName()));
     }
 
-    /**
-     * @deprecated This method is only used in test now.
-     */
-    @Deprecated
-    public boolean isServiceUnitActive(TopicName topicName) {
-        try {
-            return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
-                    .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", 
topicName, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName 
topicName) {
-        // TODO: Add unit tests cover it.
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
-            return getBundleAsync(topicName)
-                    .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
-        }
-        return getBundleAsync(topicName).thenCompose(bundle -> {
-            Optional<CompletableFuture<OwnedBundle>> optionalFuture = 
ownershipCache.getOwnedBundleAsync(bundle);
-            if (optionalFuture.isEmpty()) {
-                return CompletableFuture.completedFuture(false);
-            }
-            return optionalFuture.get().thenApply(ob -> ob != null && 
ob.isActive());
-        });
-    }
-
     private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName 
fqnn) {
         // TODO: Add unit tests cover it.
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2a8467c68d3..29528942369 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1704,38 +1704,29 @@ public class BrokerService implements Closeable {
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
-        pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
-                .thenAccept(isActive -> {
-                    if (isActive) {
-                        CompletableFuture<Map<String, String>> 
propertiesFuture;
-                        if (properties == null) {
-                            //Read properties from storage when loading topic.
-                            propertiesFuture = 
fetchTopicPropertiesAsync(topicName);
-                        } else {
-                            propertiesFuture = 
CompletableFuture.completedFuture(properties);
-                        }
-                        propertiesFuture.thenAccept(finalProperties ->
-                                //TODO add topicName in properties?
-                                createPersistentTopic0(topic, createIfMissing, 
topicFuture,
-                                        finalProperties)
-                        ).exceptionally(throwable -> {
-                            log.warn("[{}] Read topic property failed", topic, 
throwable);
-                            pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
-                            topicFuture.completeExceptionally(throwable);
-                            return null;
-                        });
-                    } else {
-                        // namespace is being unloaded
-                        String msg = String.format("Namespace is being 
unloaded, cannot add topic %s", topic);
-                        log.warn(msg);
-                        pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
-                        topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
-                    }
-                }).exceptionally(ex -> {
-                    pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
-                    topicFuture.completeExceptionally(ex);
-                    return null;
-                });
+        checkTopicNsOwnership(topic).thenRun(() -> {
+            CompletableFuture<Map<String, String>> propertiesFuture;
+            if (properties == null) {
+                //Read properties from storage when loading topic.
+                propertiesFuture = fetchTopicPropertiesAsync(topicName);
+            } else {
+                propertiesFuture = 
CompletableFuture.completedFuture(properties);
+            }
+            propertiesFuture.thenAccept(finalProperties ->
+                    //TODO add topicName in properties?
+                    createPersistentTopic0(topic, createIfMissing, topicFuture,
+                            finalProperties)
+            ).exceptionally(throwable -> {
+                log.warn("[{}] Read topic property failed", topic, throwable);
+                pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
+                topicFuture.completeExceptionally(throwable);
+                return null;
+            });
+        }).exceptionally(e -> {
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
+            topicFuture.completeExceptionally(e.getCause());
+            return null;
+        });
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 96ca2d90f06..37cf75d84ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -79,7 +79,6 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
@@ -162,7 +161,6 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         NamespaceService nsSvc = 
pulsarTestContext.getPulsarService().getNamespaceService();
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
-        doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
         
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 20f58f277a3..2f8a9246351 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -51,7 +51,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,7 +102,6 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
         NamespaceService nsSvc = mock(NamespaceService.class);
         doReturn(nsSvc).when(pulsar).getNamespaceService();
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
-        doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
         
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 42defbe293f..eaec93f78a1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -225,8 +225,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         NamespaceBundle bundle = mock(NamespaceBundle.class);
         
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
-        doReturn(true).when(nsSvc).isServiceUnitActive(any());
-        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4d734081e43..2cfbac35bfc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -231,8 +231,6 @@ public class ServerCnxTest {
                 .getBundleAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(),
 any());
         doReturn(true).when(namespaceService).isServiceUnitOwned(any());
-        doReturn(true).when(namespaceService).isServiceUnitActive(any());
-        
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
@@ -1601,8 +1599,8 @@ public class ServerCnxTest {
         setChannelConnected();
 
         // Force the case where the broker doesn't own any topic
-        
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
-                .isServiceUnitActiveAsync(any(TopicName.class));
+        doReturn(CompletableFuture.failedFuture(new 
ServiceUnitNotReadyException("failed"))).when(brokerService)
+                .checkTopicNsOwnership(any(String.class));
 
         // test PRODUCER failure case
         ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* 
producer id */, 1 /* request id */,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index b7c323af5bc..3613ba51625 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -245,7 +245,7 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
         admin.topics().createNonPartitionedTopic(tpName);
         admin.namespaces().unload(ns);
 
-        // Inject an error when calling 
"NamespaceService.isServiceUnitActiveAsync".
+        // Inject an error when loading the topic
         AtomicInteger failedTimes = new AtomicInteger();
         NamespaceService namespaceService = pulsar.getNamespaceService();
         doAnswer(invocation -> {
@@ -258,7 +258,7 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
                 return CompletableFuture.failedFuture(new 
RuntimeException("mocked error"));
             }
             return invocation.callRealMethod();
-        
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+        }).when(namespaceService).checkBundleOwnership(any(TopicName.class), 
any());
 
         // Verify: the consumer can create successfully eventually.
         Consumer consumer = 
pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
@@ -295,7 +295,7 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
                 
pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
             }
             return invocation.callRealMethod();
-        
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+        }).when(namespaceService).checkBundleOwnership(any(TopicName.class), 
any());
 
         // Verify: the consumer create failed due to pulsar does not allow to 
create topic automatically.
         try {

Reply via email to