This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new d0cff5fc7b8 [fix] [broker] Incorrect service name selection logic 
(#19505)
d0cff5fc7b8 is described below

commit d0cff5fc7b8087ae66ce914f97dac094d8466d81
Author: fengyubiao <[email protected]>
AuthorDate: Wed Feb 15 23:02:30 2023 +0800

    [fix] [broker] Incorrect service name selection logic (#19505)
    
    When calling the method `PulsarWebResource.getRedirectionUrl`, reuse the 
same `PulsarServiceNameResolver` instance.
    
    (cherry picked from commit f9af4245e0b05c382656fc674fdaeda26487258c)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 16 ++++++-----
 .../pulsar/broker/web/PulsarWebResource.java       | 31 ++++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      |  3 ++-
 .../client/impl/PulsarServiceNameResolver.java     |  5 ++--
 4 files changed, 40 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6acb35ddede..1be743a7ee9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4982,20 +4982,24 @@ public class PersistentTopicsBase extends AdminResource 
{
                                 
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
                                         .thenCompose(owned -> {
                                             if (owned) {
-                                                return 
getReplicatedSubscriptionStatusFromLocalBroker(partition, subName);
+                                                return 
getReplicatedSubscriptionStatusFromLocalBroker(partition,
+                                                        subName);
                                             } else {
                                                 try {
                                                     return 
pulsar().getAdminClient().topics()
-                                                            
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
+                                                            
.getReplicatedSubscriptionStatusAsync(partition.toString(),
+                                                                    subName)
                                                             .whenComplete((__, 
throwable) -> {
                                                                 if (throwable 
!= null) {
-                                                                    
log.error("[{}] Failed to get replicated subscriptions on"
-                                                                               
     + " {} {}",
-                                                                            
clientAppId(), partition, subName, throwable);
+                                                                    
log.error("[{}] Failed to get replicated "
+                                                                               
     + "subscriptions on {} {}",
+                                                                            
clientAppId(), partition, subName,
+                                                                            
throwable);
                                                                 }
                                                             });
                                                 } catch (Exception e) {
-                                                    log.warn("[{}] Failed to 
get replicated subscription status on {} {}",
+                                                    log.warn("[{}] Failed to 
get replicated subscription status"
+                                                                    + " on {} 
{}",
                                                             clientAppId(), 
partition, subName, e);
                                                     return 
FutureUtil.failedFuture(e);
                                                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 67904b5ff56..68e5b7da824 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.broker.web;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
@@ -29,6 +32,7 @@ import com.google.common.collect.Sets;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -85,6 +89,8 @@ import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.path.PolicyPath;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +101,17 @@ public abstract class PulsarWebResource {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarWebResource.class);
 
+    private static final LoadingCache<String, PulsarServiceNameResolver> 
SERVICE_NAME_RESOLVER_CACHE =
+            
Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build(
+                    new CacheLoader<String, PulsarServiceNameResolver>() {
+                        @Override
+                        public @Nullable PulsarServiceNameResolver 
load(@NonNull String serviceUrl) throws Exception {
+                            PulsarServiceNameResolver serviceNameResolver = 
new PulsarServiceNameResolver();
+                            serviceNameResolver.updateServiceUrl(serviceUrl);
+                            return serviceNameResolver;
+                        }
+                    });
+
     static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
 
     @Context
@@ -438,17 +455,21 @@ public abstract class PulsarWebResource {
 
     private URI getRedirectionUrl(ClusterData differentClusterData) throws 
MalformedURLException {
         try {
-            PulsarServiceNameResolver serviceNameResolver = new 
PulsarServiceNameResolver();
+            PulsarServiceNameResolver serviceNameResolver;
             if (isRequestHttps() && 
pulsar.getConfiguration().getWebServicePortTls().isPresent()
                     && 
StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
-                
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
+                serviceNameResolver = 
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrlTls());
             } else {
-                
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
+                serviceNameResolver = 
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrl());
             }
             URL webUrl = new 
URL(serviceNameResolver.resolveHostUri().toString());
             return 
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
-        } catch (PulsarClientException.InvalidServiceURL exception) {
-            throw new MalformedURLException(exception.getMessage());
+        } catch (Exception exception) {
+            if (exception.getCause() != null
+                    && exception.getCause() instanceof 
PulsarClientException.InvalidServiceURL) {
+                throw new MalformedURLException(exception.getMessage());
+            }
+            throw exception;
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index aeb35f4d39a..2468ace10f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -230,7 +230,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
         pulsar1.getConfiguration().setAuthorizationEnabled(true);
         //init clusterData
 
-        String cluster2ServiceUrls = 
String.format("%s,localhost:1234,localhost:5678", 
pulsar2.getWebServiceAddress());
+        String cluster2ServiceUrls = 
String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676",
+                pulsar2.getWebServiceAddress());
         ClusterData cluster2Data = 
ClusterData.builder().serviceUrl(cluster2ServiceUrls).build();
         String cluster2 = "activeCLuster2";
         admin2.clusters().createCluster(cluster2, cluster2Data);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 1b90f25d926..007313ef6e8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -52,9 +52,8 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
         if (list.size() == 1) {
             return list.get(0);
         } else {
-            CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) % 
list.size());
-            return list.get(currentIndex);
-
+            int originalIndex = CURRENT_INDEX_UPDATER.getAndUpdate(this, last 
-> (last + 1) % list.size());
+            return list.get((originalIndex + 1) % list.size());
         }
     }
 

Reply via email to