This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 830ae26c322 [Improve][Broker]Reduce GetReplicatedSubscriptionStatus
local REST call (#16946)
830ae26c322 is described below
commit 830ae26c322cf477a917cf12593f0f4227ce2b36
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Sep 13 10:49:35 2022 +0800
[Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call
(#16946)
(cherry picked from commit 046068a78e61fb2237f6be9cac6c64dd3d0a68e9)
---
.../apache/pulsar/broker/admin/AdminResource.java | 4 ++
.../broker/admin/impl/PersistentTopicsBase.java | 83 ++++++++++++----------
.../pulsar/broker/admin/PersistentTopicsTest.java | 35 +++++++++
3 files changed, 85 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 0e252cac61b..30526a0787d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -801,4 +801,8 @@ public abstract class AdminResource extends
PulsarWebResource {
protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
+
+ protected static String getSubNotFoundErrorMessage(String topic, String
subscription) {
+ return String.format("Subscription %s not found for topic %s",
subscription, topic);
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 90c964dbd23..6acb35ddede 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4973,26 +4973,35 @@ public class PersistentTopicsBase extends AdminResource
{
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Map<String, Boolean>>>
futures = Lists.newArrayList();
- final Map<String, Boolean> status = Maps.newHashMap();
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(partitionMetadata.partitions);
+ Map<String, Boolean> status = Maps.newHashMap();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
- try {
-
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
- partition.toString(),
subName).whenComplete((response, throwable) -> {
- if (throwable != null) {
- log.error("[{}] Failed to get replicated
subscriptions on {} {}",
- clientAppId(), partition, subName,
throwable);
- asyncResponse.resume(new
RestException(throwable));
- }
- status.putAll(response);
- }));
- } catch (Exception e) {
- log.warn("[{}] Failed to get replicated
subscription status on {} {}",
- clientAppId(), partition, subName, e);
- throw new RestException(e);
- }
+ futures.add(
+
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+ .thenCompose(owned -> {
+ if (owned) {
+ return
getReplicatedSubscriptionStatusFromLocalBroker(partition, subName);
+ } else {
+ try {
+ return
pulsar().getAdminClient().topics()
+
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
+ .whenComplete((__,
throwable) -> {
+ if (throwable
!= null) {
+
log.error("[{}] Failed to get replicated subscriptions on"
+
+ " {} {}",
+
clientAppId(), partition, subName, throwable);
+ }
+ });
+ } catch (Exception e) {
+ log.warn("[{}] Failed to
get replicated subscription status on {} {}",
+ clientAppId(),
partition, subName, e);
+ return
FutureUtil.failedFuture(e);
+ }
+ }
+ }).thenAccept(status::putAll)
+ );
}
FutureUtil.waitForAll(futures).handle((result, exception)
-> {
@@ -5029,33 +5038,33 @@ public class PersistentTopicsBase extends AdminResource
{
}
}
- private void
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
-
String subName,
-
boolean authoritative) {
- try {
- // Redirect the request to the appropriate broker if this broker
is not the owner of the topic
- validateTopicOwnership(topicName, authoritative);
-
- Topic topic = getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
- return;
- }
-
+ private CompletableFuture<Map<String, Boolean>>
getReplicatedSubscriptionStatusFromLocalBroker(
+ TopicName localTopicName,
+ String subName) {
+ return getTopicReferenceAsync(localTopicName).thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(localTopicName.toString(),
subName)));
}
-
if (topic instanceof PersistentTopic && sub instanceof
PersistentSubscription) {
- Map res = Maps.newHashMap();
- res.put(topicName.toString(), sub.isReplicated());
- asyncResponse.resume(res);
+ return CompletableFuture.completedFuture(
+ Collections.singletonMap(localTopicName.toString(),
sub.isReplicated()));
} else {
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
+ return FutureUtil.failedFuture(new
RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent
topics"));
}
+ });
+ }
+
+ private void
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
+
String subName,
+
boolean authoritative) {
+ try {
+ // Redirect the request to the appropriate broker if this broker
is not the owner of the topic
+ validateTopicOwnership(topicName, authoritative);
+
+ getReplicatedSubscriptionStatusFromLocalBroker(topicName,
subName).get();
} catch (Exception e) {
log.error("[{}] Failed to get replicated subscription status on {}
{}", clientAppId(),
topicName, subName, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index bd86a666c0f..08942eae8dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
@@ -55,6 +56,7 @@ import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
@@ -62,7 +64,9 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
@@ -1271,4 +1275,35 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testInternalGetReplicatedSubscriptionStatusFromLocal() throws
Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ + "/testInternalGetReplicatedSubscriptionStatusFromLocal";
+ String subName =
"sub_testInternalGetReplicatedSubscriptionStatusFromLocal";
+ TopicName topic = TopicName.get(topicName);
+ admin.topics().createPartitionedTopic(topicName, 2);
+ admin.topics().createSubscription(topicName, subName,
MessageId.latest);
+
+ // partition-0 call from local and partition-1 call from admin.
+ NamespaceService namespaceService = spy(pulsar.getNamespaceService());
+ doReturn(CompletableFuture.completedFuture(true))
+
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(0));
+ doReturn(CompletableFuture.completedFuture(false))
+
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(1));
+
+ doReturn(namespaceService).when(pulsar).getNamespaceService();
+
+ PulsarAdmin adminFromPulsar = spy(pulsar.getAdminClient());
+ doReturn(adminFromPulsar).when(pulsar).getAdminClient();
+ Topics topics = spy(adminFromPulsar.topics());
+ doReturn(topics).when(adminFromPulsar).topics();
+
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.getReplicatedSubscriptionStatus(response, testTenant,
testNamespace, topic.getLocalName(),
+ subName, false);
+ verify(response, timeout(5000).times(1)).resume(any());
+
+ // verify we only call getReplicatedSubscriptionStatusAsync once.
+ verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(),
any());
+ }
}