This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new d0cff5fc7b8 [fix] [broker] Incorrect service name selection logic
(#19505)
d0cff5fc7b8 is described below
commit d0cff5fc7b8087ae66ce914f97dac094d8466d81
Author: fengyubiao <[email protected]>
AuthorDate: Wed Feb 15 23:02:30 2023 +0800
[fix] [broker] Incorrect service name selection logic (#19505)
When calling the method `PulsarWebResource.getRedirectionUrl`, reuse the
same `PulsarServiceNameResolver` instance.
(cherry picked from commit f9af4245e0b05c382656fc674fdaeda26487258c)
---
.../broker/admin/impl/PersistentTopicsBase.java | 16 ++++++-----
.../pulsar/broker/web/PulsarWebResource.java | 31 ++++++++++++++++++----
.../pulsar/broker/service/ReplicatorTest.java | 3 ++-
.../client/impl/PulsarServiceNameResolver.java | 5 ++--
4 files changed, 40 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6acb35ddede..1be743a7ee9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4982,20 +4982,24 @@ public class PersistentTopicsBase extends AdminResource
{
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
- return
getReplicatedSubscriptionStatusFromLocalBroker(partition, subName);
+ return
getReplicatedSubscriptionStatusFromLocalBroker(partition,
+ subName);
} else {
try {
return
pulsar().getAdminClient().topics()
-
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
+
.getReplicatedSubscriptionStatusAsync(partition.toString(),
+ subName)
.whenComplete((__,
throwable) -> {
if (throwable
!= null) {
-
log.error("[{}] Failed to get replicated subscriptions on"
-
+ " {} {}",
-
clientAppId(), partition, subName, throwable);
+
log.error("[{}] Failed to get replicated "
+
+ "subscriptions on {} {}",
+
clientAppId(), partition, subName,
+
throwable);
}
});
} catch (Exception e) {
- log.warn("[{}] Failed to
get replicated subscription status on {} {}",
+ log.warn("[{}] Failed to
get replicated subscription status"
+ + " on {}
{}",
clientAppId(),
partition, subName, e);
return
FutureUtil.failedFuture(e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 67904b5ff56..68e5b7da824 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
@@ -29,6 +32,7 @@ import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -85,6 +89,8 @@ import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,6 +101,17 @@ public abstract class PulsarWebResource {
private static final Logger log =
LoggerFactory.getLogger(PulsarWebResource.class);
+ private static final LoadingCache<String, PulsarServiceNameResolver>
SERVICE_NAME_RESOLVER_CACHE =
+
Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build(
+ new CacheLoader<String, PulsarServiceNameResolver>() {
+ @Override
+ public @Nullable PulsarServiceNameResolver
load(@NonNull String serviceUrl) throws Exception {
+ PulsarServiceNameResolver serviceNameResolver =
new PulsarServiceNameResolver();
+ serviceNameResolver.updateServiceUrl(serviceUrl);
+ return serviceNameResolver;
+ }
+ });
+
static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
@Context
@@ -438,17 +455,21 @@ public abstract class PulsarWebResource {
private URI getRedirectionUrl(ClusterData differentClusterData) throws
MalformedURLException {
try {
- PulsarServiceNameResolver serviceNameResolver = new
PulsarServiceNameResolver();
+ PulsarServiceNameResolver serviceNameResolver;
if (isRequestHttps() &&
pulsar.getConfiguration().getWebServicePortTls().isPresent()
&&
StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
-
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
+ serviceNameResolver =
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrlTls());
} else {
-
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
+ serviceNameResolver =
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrl());
}
URL webUrl = new
URL(serviceNameResolver.resolveHostUri().toString());
return
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
- } catch (PulsarClientException.InvalidServiceURL exception) {
- throw new MalformedURLException(exception.getMessage());
+ } catch (Exception exception) {
+ if (exception.getCause() != null
+ && exception.getCause() instanceof
PulsarClientException.InvalidServiceURL) {
+ throw new MalformedURLException(exception.getMessage());
+ }
+ throw exception;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index aeb35f4d39a..2468ace10f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -230,7 +230,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
//init clusterData
- String cluster2ServiceUrls =
String.format("%s,localhost:1234,localhost:5678",
pulsar2.getWebServiceAddress());
+ String cluster2ServiceUrls =
String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676",
+ pulsar2.getWebServiceAddress());
ClusterData cluster2Data =
ClusterData.builder().serviceUrl(cluster2ServiceUrls).build();
String cluster2 = "activeCLuster2";
admin2.clusters().createCluster(cluster2, cluster2Data);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 1b90f25d926..007313ef6e8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -52,9 +52,8 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
if (list.size() == 1) {
return list.get(0);
} else {
- CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) %
list.size());
- return list.get(currentIndex);
-
+ int originalIndex = CURRENT_INDEX_UPDATER.getAndUpdate(this, last
-> (last + 1) % list.size());
+ return list.get((originalIndex + 1) % list.size());
}
}