This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new c26c62f  [Branch 2.7] Avoid potentially blocking calls to metadata on 
critical threads (#12339) (#12340)
c26c62f is described below

commit c26c62fa804a559646d0cecd750b79f1e4713ceb
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 13 09:09:29 2021 -0700

    [Branch 2.7] Avoid potentially blocking calls to metadata on critical 
threads (#12339) (#12340)
    
    * Avoid potentially blocking calls to metadata on critical threads (#12339)
    
    * Fixed NPE
    
    * Addressed comments
    
    * Fixed issue with mocked tests
    
    * Fixed behavior in BacklogQuotaManager to be like before
    
    * Fixed AuthorizationProducerConsumerTest
    
    * Fixed PersistentTopicTest
    
    * Fixed PersistentTopicTest
---
 .../authorization/PulsarAuthorizationProvider.java |  4 +--
 .../pulsar/broker/lookup/TopicLookupBase.java      | 31 +++---------------
 .../pulsar/broker/service/AbstractTopic.java       | 38 +++++++---------------
 .../pulsar/broker/service/BacklogQuotaManager.java | 14 +++++---
 .../pulsar/broker/service/BrokerService.java       | 12 ++++---
 .../apache/pulsar/broker/service/ServerCnx.java    |  3 +-
 .../service/persistent/DispatchRateLimiter.java    | 11 +++----
 .../broker/service/persistent/PersistentTopic.java | 23 +++++++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 10 ++++++
 .../api/AuthorizationProducerConsumerTest.java     |  4 +--
 10 files changed, 71 insertions(+), 79 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 934d058..12c289f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -338,13 +338,13 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
     }
 
     private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName, Set<String> roles,
-            boolean remove) {
+                                                                      boolean 
remove) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-
         try {
             validatePoliciesReadOnlyAccess();
         } catch (Exception e) {
             result.completeExceptionally(e);
+            return result;
         }
 
         ZooKeeper globalZk = configCache.getZooKeeper();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 19ddce75..67b3268 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -163,37 +163,14 @@ public class TopicLookupBase extends PulsarWebResource {
      * @param authoritative
      * @param clientAppId
      * @param requestId
-     * @return
-     */
-    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService 
pulsarService, TopicName topicName,
-            boolean authoritative, String clientAppId, 
AuthenticationDataSource authenticationData, long requestId) {
-        return lookupTopicAsync(pulsarService, topicName, authoritative, 
clientAppId, authenticationData, requestId, null);
-    }
-
-    /**
-     *
-     * Lookup broker-service address for a given namespace-bundle which 
contains given topic.
-     *
-     * a. Returns broker-address if namespace-bundle is already owned by any 
broker
-     * b. If current-broker receives lookup-request and if it's not a leader 
then current broker redirects request
-     *    to leader by returning leader-service address.
-     * c. If current-broker is leader then it finds out least-loaded broker to 
own namespace bundle and redirects request
-     *    by returning least-loaded broker.
-     * d. If current-broker receives request to own the namespace-bundle then 
it owns a bundle and returns success(connect)
-     *    response to client.
-     *
-     * @param pulsarService
-     * @param topicName
-     * @param authoritative
-     * @param clientAppId
-     * @param requestId
      * @param advertisedListenerName
      * @return
      */
     public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService 
pulsarService, TopicName topicName,
                                                               boolean 
authoritative, String clientAppId,
                                                               
AuthenticationDataSource authenticationData, long requestId,
-                                                              final String 
advertisedListenerName) {
+                                                              final String 
advertisedListenerName,
+                                                              boolean 
isAlreadyAuthorized) {
 
         final CompletableFuture<ByteBuf> validationFuture = new 
CompletableFuture<>();
         final CompletableFuture<ByteBuf> lookupfuture = new 
CompletableFuture<>();
@@ -213,7 +190,9 @@ public class TopicLookupBase extends PulsarWebResource {
             } else {
                 // (2) authorize client
                 try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, 
authenticationData);
+                    if (!isAlreadyAuthorized) {
+                        checkAuthorization(pulsarService, topicName, 
clientAppId, authenticationData);
+                    }
                 } catch (RestException authException) {
                     log.warn("Failed to authorized {} on cluster {}", 
clientAppId, topicName.toString());
                     
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 23b2221..339e89d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -114,13 +114,13 @@ public abstract class AbstractTopic implements Topic {
         
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
         
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
         this.lastActive = System.nanoTime();
-        Policies policies = null;
+        Policies policies;
         try {
             policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
-                    .orElseGet(() -> new Policies());
+                    .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
+            policies = new Policies();
         }
         this.preciseTopicPublishRateLimitingEnable =
                 
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
@@ -135,15 +135,11 @@ public abstract class AbstractTopic implements Topic {
         }
 
         if (maxProducers == null) {
-            Policies policies;
-            try {
-                policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                        .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
-                        .orElseGet(() -> new Policies());
-            } catch (Exception e) {
-                policies = new Policies();
+            Policies policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+            if (policies != null) {
+                maxProducers = policies.max_producers_per_topic;
             }
-            maxProducers = policies.max_producers_per_topic;
         }
         maxProducers = maxProducers != null ? maxProducers : 
brokerService.pulsar()
                 .getConfiguration().getMaxProducersPerTopic();
@@ -160,22 +156,12 @@ public abstract class AbstractTopic implements Topic {
             maxConsumers = topicPolicies.getMaxConsumerPerTopic();
         }
         if (maxConsumers == null) {
-            Policies policies;
-            try {
-                // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
-                policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
-
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get namespace policies that include 
max number of consumers: {}", topic,
-                        e.getMessage());
-                policies = new Policies();
-            }
-            maxConsumers = policies.max_consumers_per_topic;
+            // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
+            Policies policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+            maxConsumers = policies != null ? policies.max_consumers_per_topic 
: 0;
         }
+
         final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers
                 : 
brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
getNumberOfConsumers()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 020efa1..63e3e98 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -65,13 +65,17 @@ public class BacklogQuotaManager {
     }
 
     public BacklogQuota getBacklogQuota(String namespace, String policyPath) {
+        Policies policies = null;
         try {
-            return zkCache.get(policyPath)
-                    .map(p -> 
p.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, 
defaultQuota))
-                    .orElse(defaultQuota);
+             policies = zkCache.getDataIfPresent(policyPath);
         } catch (Exception e) {
-            log.warn("Failed to read policies data, will apply the default 
backlog quota: namespace={}", namespace, e);
-            return this.defaultQuota;
+            log.warn("Failed to check policies for path {}: {}", policyPath, 
e);
+        }
+
+        if (policies != null) {
+            return 
policies.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, 
defaultQuota);
+        } else {
+            return defaultQuota;
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b7eca90..d9c015e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2414,8 +2414,10 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private AutoTopicCreationOverride getAutoTopicCreationOverride(final 
TopicName topicName) {
         try {
-            Optional<Policies> policies = 
pulsar.getConfigurationCache().policiesCache()
-                            .get(AdminResource.path(POLICIES, 
topicName.getNamespace()));
+            Optional<Policies> policies =
+                    
Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent(
+                            AdminResource.path(POLICIES, 
topicName.getNamespace().toString()))
+                    );
             // If namespace policies have the field set, it will override the 
broker-level setting
             if (policies.isPresent() && 
policies.get().autoTopicCreationOverride != null) {
                 return policies.get().autoTopicCreationOverride;
@@ -2445,8 +2447,10 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private AutoSubscriptionCreationOverride 
getAutoSubscriptionCreationOverride(final TopicName topicName) {
         try {
-            Optional<Policies> policies = 
pulsar.getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
topicName.getNamespace()));
+            Optional<Policies> policies =
+                    
Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent(
+                            AdminResource.path(POLICIES, 
topicName.getNamespace().toString()))
+                    );
             // If namespace policies have the field set, it will override the 
broker-level setting
             if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
                 return policies.get().autoSubscriptionCreationOverride;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fa67aa4..22e2d28 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -378,7 +378,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 if (isAuthorized) {
                     lookupTopicAsync(getBrokerService().pulsar(), topicName, 
authoritative,
                             getPrincipal(), getAuthenticationData(),
-                            requestId, 
advertisedListenerName).handle((lookupResponse, ex) -> {
+                            requestId, advertisedListenerName,
+                            true /* isAlreadyAuthorized 
*/).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
                                 } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b2447a2..1ddb0a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -28,6 +25,7 @@ import java.util.function.Supplier;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -171,7 +169,7 @@ public class DispatchRateLimiter {
             return true;
         }
 
-        policies = policies.isPresent() ? policies : 
getPolicies(brokerService, topicName);
+        policies = policies.isPresent() ? policies :  
getPolicies(brokerService, topicName);
         return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
     }
 
@@ -302,13 +300,12 @@ public class DispatchRateLimiter {
 
     public static Optional<Policies> getPolicies(BrokerService brokerService, 
String topicName) {
         final NamespaceName namespace = 
TopicName.get(topicName).getNamespaceObject();
-        final String path = path(POLICIES, namespace.toString());
         Optional<Policies> policies = Optional.empty();
         try {
             ConfigurationCacheService configurationCacheService = 
brokerService.pulsar().getConfigurationCache();
             if (configurationCacheService != null) {
-                policies = 
configurationCacheService.policiesCache().getAsync(path)
-                        
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
 SECONDS);
+                return 
Optional.ofNullable(configurationCacheService.policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, 
namespace.toString())));
             }
         } catch (Exception e) {
             log.warn("Failed to get message-rate for {} ", topicName, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c40d4cc..d76fb28 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -518,8 +518,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         // read repl-cluster from policies to avoid restart of replicator 
which are in process of disconnect and close
         try {
             Policies policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
-                    .orElseThrow(() -> new KeeperException.NoNodeException());
+                    .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+            if (policies == null) {
+                throw new KeeperException.NoNodeException();
+            }
+
             if (policies.replication_clusters != null) {
                 Set<String> configuredClusters = 
Sets.newTreeSet(policies.replication_clusters);
                 replicators.forEach((region, replicator) -> {
@@ -2010,8 +2013,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         TopicName name = TopicName.get(topic);
         try {
             Policies policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, name.getNamespace()))
-                    .orElseThrow(() -> new KeeperException.NoNodeException());
+                    .getDataIfPresent(AdminResource.path(POLICIES, 
name.getNamespace()));
+            if (policies == null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Error getting policies", topic);
+                }
+                return;
+            }
+
             final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
                     .getSubscriptionExpirationTimeMinutes();
             final long expirationTimeMillis = TimeUnit.MINUTES
@@ -2457,8 +2466,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         TopicName name = TopicName.get(topic);
         TopicPolicies topicPolicies = getTopicPolicies(name);
         Policies policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, name.getNamespace()))
-                .orElseThrow(KeeperException.NoNodeException::new);
+                .getDataIfPresent(AdminResource.path(POLICIES, 
name.getNamespace()));
+        if (policies == null) {
+          throw new KeeperException.NoNodeException();
+        }
         if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
             return topicPolicies.getMessageTTLInSeconds();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 41c15c7..2537c99 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -517,6 +517,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         when(pulsar.getConfigurationCache().policiesCache()
                 .get(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
+        when(pulsar.getConfigurationCache().policiesCache()
+                .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
+                .thenReturn(policies);
         testMaxProducers();
     }
 
@@ -1439,6 +1442,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         when(pulsar.getConfigurationCache().policiesCache()
                 .get(AdminResource.path(POLICIES, 
TopicName.get(globalTopicName).getNamespace())))
                         .thenReturn(Optional.of(new Policies()));
+        when(pulsar.getConfigurationCache().policiesCache()
+                .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(globalTopicName).getNamespace())))
+                .thenReturn(new Policies());
         // try to start replicator again
         topic.startReplProducers();
         // verify: replicator.startProducer is not invoked
@@ -1748,6 +1754,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 .get(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(new Policies()));
 
+        when(pulsar.getConfigurationCache().policiesCache()
+                .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
+                .thenReturn(new Policies());
+
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes();
         doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index d43f657..2e071af 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -542,13 +542,13 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         @Override
         public CompletableFuture<Boolean> allowTopicOperationAsync(
             TopicName topic, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
-            return CompletableFuture.completedFuture(true);
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
         public Boolean allowTopicOperation(
             TopicName topicName, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
-            return true;
+            return clientAuthProviderSupportedRoles.contains(role);
         }
     }
 

Reply via email to