This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 830ae26c322 [Improve][Broker]Reduce GetReplicatedSubscriptionStatus 
local REST call (#16946)
830ae26c322 is described below

commit 830ae26c322cf477a917cf12593f0f4227ce2b36
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Sep 13 10:49:35 2022 +0800

    [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call 
(#16946)
    
    (cherry picked from commit 046068a78e61fb2237f6be9cac6c64dd3d0a68e9)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  4 ++
 .../broker/admin/impl/PersistentTopicsBase.java    | 83 ++++++++++++----------
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 35 +++++++++
 3 files changed, 85 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 0e252cac61b..30526a0787d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -801,4 +801,8 @@ public abstract class AdminResource extends 
PulsarWebResource {
     protected static String getTopicNotFoundErrorMessage(String topic) {
         return String.format("Topic %s not found", topic);
     }
+
+    protected static String getSubNotFoundErrorMessage(String topic, String 
subscription) {
+        return String.format("Subscription %s not found for topic %s", 
subscription, topic);
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 90c964dbd23..6acb35ddede 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4973,26 +4973,35 @@ public class PersistentTopicsBase extends AdminResource 
{
             getPartitionedTopicMetadataAsync(topicName,
                     authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Map<String, Boolean>>> 
futures = Lists.newArrayList();
-                    final Map<String, Boolean> status = Maps.newHashMap();
+                    List<CompletableFuture<Void>> futures = new 
ArrayList<>(partitionMetadata.partitions);
+                    Map<String, Boolean> status = Maps.newHashMap();
 
                     for (int i = 0; i < partitionMetadata.partitions; i++) {
                         TopicName partition = topicName.getPartition(i);
-                        try {
-                            
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                    partition.toString(), 
subName).whenComplete((response, throwable) -> {
-                                if (throwable != null) {
-                                    log.error("[{}] Failed to get replicated 
subscriptions on {} {}",
-                                            clientAppId(), partition, subName, 
throwable);
-                                    asyncResponse.resume(new 
RestException(throwable));
-                                }
-                                status.putAll(response);
-                            }));
-                        } catch (Exception e) {
-                            log.warn("[{}] Failed to get replicated 
subscription status on {} {}",
-                                    clientAppId(), partition, subName, e);
-                            throw new RestException(e);
-                        }
+                        futures.add(
+                                
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                        .thenCompose(owned -> {
+                                            if (owned) {
+                                                return 
getReplicatedSubscriptionStatusFromLocalBroker(partition, subName);
+                                            } else {
+                                                try {
+                                                    return 
pulsar().getAdminClient().topics()
+                                                            
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
+                                                            .whenComplete((__, 
throwable) -> {
+                                                                if (throwable 
!= null) {
+                                                                    
log.error("[{}] Failed to get replicated subscriptions on"
+                                                                               
     + " {} {}",
+                                                                            
clientAppId(), partition, subName, throwable);
+                                                                }
+                                                            });
+                                                } catch (Exception e) {
+                                                    log.warn("[{}] Failed to 
get replicated subscription status on {} {}",
+                                                            clientAppId(), 
partition, subName, e);
+                                                    return 
FutureUtil.failedFuture(e);
+                                                }
+                                            }
+                                        }).thenAccept(status::putAll)
+                        );
                     }
 
                     FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
@@ -5029,33 +5038,33 @@ public class PersistentTopicsBase extends AdminResource 
{
         }
     }
 
-    private void 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
-                                                                               
String subName,
-                                                                               
boolean authoritative) {
-        try {
-            // Redirect the request to the appropriate broker if this broker 
is not the owner of the topic
-            validateTopicOwnership(topicName, authoritative);
-
-            Topic topic = getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
-                return;
-            }
-
+    private CompletableFuture<Map<String, Boolean>> 
getReplicatedSubscriptionStatusFromLocalBroker(
+            TopicName localTopicName,
+            String subName) {
+        return getTopicReferenceAsync(localTopicName).thenCompose(topic -> {
             Subscription sub = topic.getSubscription(subName);
             if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                return;
+                return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(localTopicName.toString(), 
subName)));
             }
-
             if (topic instanceof PersistentTopic && sub instanceof 
PersistentSubscription) {
-                Map res = Maps.newHashMap();
-                res.put(topicName.toString(), sub.isReplicated());
-                asyncResponse.resume(res);
+                return CompletableFuture.completedFuture(
+                        Collections.singletonMap(localTopicName.toString(), 
sub.isReplicated()));
             } else {
-                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
                         "Cannot get replicated subscriptions on non-persistent 
topics"));
             }
+        });
+    }
+
+    private void 
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                                               
String subName,
+                                                                               
boolean authoritative) {
+        try {
+            // Redirect the request to the appropriate broker if this broker 
is not the owner of the topic
+            validateTopicOwnership(topicName, authoritative);
+
+            getReplicatedSubscriptionStatusFromLocalBroker(topicName, 
subName).get();
         } catch (Exception e) {
             log.error("[{}] Failed to get replicated subscription status on {} 
{}", clientAppId(),
                     topicName, subName, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index bd86a666c0f..08942eae8dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import com.google.common.collect.Lists;
@@ -55,6 +56,7 @@ import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
@@ -62,7 +64,9 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
@@ -1271,4 +1275,35 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testInternalGetReplicatedSubscriptionStatusFromLocal() throws 
Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace
+                + "/testInternalGetReplicatedSubscriptionStatusFromLocal";
+        String subName = 
"sub_testInternalGetReplicatedSubscriptionStatusFromLocal";
+        TopicName topic = TopicName.get(topicName);
+        admin.topics().createPartitionedTopic(topicName, 2);
+        admin.topics().createSubscription(topicName, subName, 
MessageId.latest);
+
+        // partition-0 call from local and partition-1 call from admin.
+        NamespaceService namespaceService = spy(pulsar.getNamespaceService());
+        doReturn(CompletableFuture.completedFuture(true))
+                
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(0));
+        doReturn(CompletableFuture.completedFuture(false))
+                
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(1));
+
+        doReturn(namespaceService).when(pulsar).getNamespaceService();
+
+        PulsarAdmin adminFromPulsar = spy(pulsar.getAdminClient());
+        doReturn(adminFromPulsar).when(pulsar).getAdminClient();
+        Topics topics = spy(adminFromPulsar.topics());
+        doReturn(topics).when(adminFromPulsar).topics();
+
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.getReplicatedSubscriptionStatus(response, testTenant, 
testNamespace, topic.getLocalName(),
+                subName, false);
+        verify(response, timeout(5000).times(1)).resume(any());
+
+        // verify we only call getReplicatedSubscriptionStatusAsync once.
+        verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(), 
any());
+    }
 }

Reply via email to