This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 37aca83  Avoid potentially blocking calls to metadata on critical 
threads (#12339)
37aca83 is described below

commit 37aca83588ae90c93694fc2afc7349f24bcb8c88
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Oct 14 12:33:17 2021 -0700

    Avoid potentially blocking calls to metadata on critical threads (#12339)
    
    * Avoid potentially blocking calls to metadata on critical threads
    
    * Fixed log arguments order
    
    * Addressed comments
    
    * Fixed mock in PersistentSubscriptionTest
    
    * Fixed issue in mocked tests
    
    * Fixed test that was force policies modification under the hood
---
 .../authorization/PulsarAuthorizationProvider.java | 67 +++++++++++-----------
 .../pulsar/broker/service/AbstractTopic.java       | 60 +++++++++----------
 .../pulsar/broker/service/BrokerService.java       | 35 ++++-------
 .../service/persistent/DispatchRateLimiter.java    |  7 +--
 .../persistent/PersistentSubscriptionTest.java     |  9 ++-
 5 files changed, 78 insertions(+), 100 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 005707f..e355b12 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.google.common.base.Function;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
@@ -46,6 +47,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -322,48 +324,43 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
Collections.singleton(role), true);
     }
 
-    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName, Set<String> roles,
-            boolean remove) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-
+    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+                                                                      
Set<String> roles,
+                                                                      boolean 
remove) {
         try {
             validatePoliciesReadOnlyAccess();
         } catch (Exception e) {
-            result.completeExceptionally(e);
+            return FutureUtil.failedFuture(e);
         }
 
-        try {
-            Policies policies = 
pulsarResources.getNamespaceResources().getPolicies(namespace)
-                    .orElseThrow(() -> new NotFoundException(namespace + " not 
found"));
-            if (remove) {
-                if 
(policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) 
!= null) {
-                    
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
-                }else {
-                    log.info("[{}] Couldn't find role {} while revoking for 
sub = {}", namespace, subscriptionName, roles);
-                    result.completeExceptionally(new 
IllegalArgumentException("couldn't find subscription"));
-                    return result;
-                }
-            } else {
-                
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, 
roles);
-            }
-            pulsarResources.getNamespaceResources().setPolicies(namespace, 
(data)->policies);
+        CompletableFuture<Void> future =
+                
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> 
{
+                    if (remove) {
+                        Set<String> subscriptionAuth =
+                                
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
+                        if (subscriptionAuth != null) {
+                            subscriptionAuth.removeAll(roles);
+                        } else {
+                            log.info("[{}] Couldn't find role {} while 
revoking for sub = {}", namespace,
+                                    roles, subscriptionName);
+                            throw new IllegalArgumentException("couldn't find 
subscription");
+                        }
+                    } else {
+                        
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, 
roles);
+                    }
+                    return policies;
+                }).thenRun(() -> {
+                    log.info("[{}] Successfully granted access for role {} for 
sub = {}", namespace, subscriptionName,
+                            roles);
+                });
 
-            log.info("[{}] Successfully granted access for role {} for sub = 
{}", namespace, subscriptionName, roles);
-            result.complete(null);
-        } catch (NotFoundException e) {
-            log.warn("[{}] Failed to set permissions for namespace {}: does 
not exist", subscriptionName, namespace);
-            result.completeExceptionally(new 
IllegalArgumentException("Namespace does not exist" + namespace));
-        } catch (BadVersionException e) {
-            log.warn("[{}] Failed to set permissions for {} on namespace {}: 
concurrent modification", subscriptionName, roles, namespace);
-            result.completeExceptionally(new IllegalStateException(
-                    "Concurrent modification on metadata path: " + namespace + 
", " + e.getMessage()));
-        } catch (Exception e) {
-            log.error("[{}] Failed to get permissions for role {} on namespace 
{}", subscriptionName, roles, namespace, e);
-            result.completeExceptionally(
-                    new IllegalStateException("Failed to get permissions for 
namespace " + namespace));
-        }
+        future.exceptionally(ex -> {
+            log.error("[{}] Failed to get permissions for role {} on namespace 
{}", subscriptionName, roles, namespace,
+                    ex);
+            return null;
+        });
 
-        return result;
+        return future;
     }
 
     private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, 
String role, AuthAction action) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 1e5710b..951d602 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -146,31 +146,18 @@ public abstract class AbstractTopic implements Topic {
         this.topicMaxMessageSizeCheckIntervalMs = 
TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
                 .getMaxMessageSizeCheckIntervalInSeconds());
         this.lastActive = System.nanoTime();
-        Policies policies = null;
-        try {
-            policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
-                            TopicName.get(topic).getNamespaceObject())
-                    .orElseGet(() -> new Policies());
-        } catch (Exception e) {
-            log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
-        }
         this.preciseTopicPublishRateLimitingEnable =
                 
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
-        updatePublishDispatcher(policies);
+        updatePublishDispatcher(Optional.empty());
     }
 
     protected boolean isProducersExceeded() {
         Integer maxProducers = 
getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);
 
         if (maxProducers == null) {
-            Policies policies;
-            try {
-                policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
-                                TopicName.get(topic).getNamespaceObject())
-                        .orElseGet(() -> new Policies());
-            } catch (Exception e) {
-                policies = new Policies();
-            }
+            Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
+                    
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
+                    .orElseGet(() -> new Policies());
             maxProducers = policies.max_producers_per_topic;
         }
         maxProducers = maxProducers != null ? maxProducers : 
brokerService.pulsar()
@@ -208,21 +195,12 @@ public abstract class AbstractTopic implements Topic {
     protected boolean isConsumersExceededOnTopic() {
         Integer maxConsumers = 
getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
         if (maxConsumers == null) {
-            Policies policies;
-            try {
-                // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
-                policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
-                                TopicName.get(topic).getNamespaceObject())
-                        .orElseGet(() -> new Policies());
 
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get namespace policies that include 
max number of consumers: {}", topic,
-                        e.getMessage());
-                policies = new Policies();
-            }
+            // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
+            Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+                            TopicName.get(topic).getNamespaceObject())
+                    .orElseGet(() -> new Policies());
+
             maxConsumers = policies.max_consumers_per_topic;
         }
         final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
@@ -789,10 +767,10 @@ public abstract class AbstractTopic implements Topic {
     }
 
     public void updateMaxPublishRate(Policies policies) {
-        updatePublishDispatcher(policies);
+        updatePublishDispatcher(Optional.of(policies));
     }
 
-    private void updatePublishDispatcher(Policies policies) {
+    private void updatePublishDispatcher(Optional<Policies> optPolicies) {
         //if topic-level policy exists, try to use topic-level publish rate 
policy
         Optional<PublishRate> topicPublishRate = 
getTopicPolicies().map(TopicPolicies::getPublishRate);
         if (topicPublishRate.isPresent()) {
@@ -802,9 +780,23 @@ public abstract class AbstractTopic implements Topic {
             return;
         }
 
+        Policies policies;
+        try {
+            if (optPolicies.isPresent()) {
+                policies = optPolicies.get();
+            } else {
+                policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+                                TopicName.get(topic).getNamespaceObject())
+                        .orElseGet(() -> new Policies());
+            }
+        } catch (Exception e) {
+            log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
+            policies = new Policies();
+        }
+
         //topic-level policy is not set, try to use namespace-level rate policy
         final String clusterName = 
brokerService.pulsar().getConfiguration().getClusterName();
-        final PublishRate publishRate = policies != null && 
policies.publishMaxMessageRate != null
+        final PublishRate publishRate = policies.publishMaxMessageRate != null
                 ? policies.publishMaxMessageRate.get(clusterName)
                 : null;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bd3c1e9..71c64c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2535,18 +2535,12 @@ public class BrokerService implements Closeable {
     }
 
     private AutoTopicCreationOverride getAutoTopicCreationOverride(final 
TopicName topicName) {
-        try {
-            Optional<Policies> policies =
-                    
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
-            // If namespace policies have the field set, it will override the 
broker-level setting
-            if (policies.isPresent() && 
policies.get().autoTopicCreationOverride != null) {
-                return policies.get().autoTopicCreationOverride;
-            }
-        } catch (Throwable t) {
-            // Ignoring since if we don't have policies, we fallback on the 
default
-            log.warn("Got exception when reading autoTopicCreateOverride 
policy for {}: {};",
-                    topicName, t.getMessage(), t);
-            return null;
+        Optional<Policies> policies =
+                pulsar.getPulsarResources().getNamespaceResources()
+                        .getPoliciesIfCached(topicName.getNamespaceObject());
+        // If namespace policies have the field set, it will override the 
broker-level setting
+        if (policies.isPresent() && policies.get().autoTopicCreationOverride 
!= null) {
+            return policies.get().autoTopicCreationOverride;
         }
         log.debug("No autoTopicCreateOverride policy found for {}", topicName);
         return null;
@@ -2568,18 +2562,11 @@ public class BrokerService implements Closeable {
     }
 
     private AutoSubscriptionCreationOverride 
getAutoSubscriptionCreationOverride(final TopicName topicName) {
-        try {
-            Optional<Policies> policies =
-                    
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
-            // If namespace policies have the field set, it will override the 
broker-level setting
-            if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
-                return policies.get().autoSubscriptionCreationOverride;
-            }
-        } catch (Throwable t) {
-            // Ignoring since if we don't have policies, we fallback on the 
default
-            log.warn("Got exception when reading 
autoSubscriptionCreateOverride policy for {}: {};",
-                    topicName, t.getMessage(), t);
-            return null;
+        Optional<Policies> policies =
+                
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
+        // If namespace policies have the field set, it will override the 
broker-level setting
+        if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
+            return policies.get().autoSubscriptionCreationOverride;
         }
         log.debug("No autoSubscriptionCreateOverride policy found for {}", 
topicName);
         return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 696995d..47bb638 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -320,12 +320,7 @@ public class DispatchRateLimiter {
 
     public static Optional<Policies> getPolicies(BrokerService brokerService, 
String topicName) {
         final NamespaceName namespace = 
TopicName.get(topicName).getNamespaceObject();
-        try {
-            return 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace);
-        } catch (Exception e) {
-            log.warn("Failed to get message-rate for {} ", topicName, e);
-            return Optional.empty();
-        }
+        return 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 8be9b81..76f485e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -120,7 +121,13 @@ public class PersistentSubscriptionTest {
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setTransactionCoordinatorEnabled(true);
         pulsarMock = spy(new PulsarService(svcConfig));
-        
doReturn(mock(PulsarResources.class)).when(pulsarMock).getPulsarResources();
+        PulsarResources pulsarResources = mock(PulsarResources.class);
+        doReturn(pulsarResources).when(pulsarMock).getPulsarResources();
+        NamespaceResources namespaceResources = mock(NamespaceResources.class);
+        
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
+
+        doReturn(Optional.of(new 
Policies())).when(namespaceResources).getPoliciesIfCached(any());
+
         doReturn(new 
InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider();
         doReturn(new TransactionPendingAckStoreProvider() {
             @Override

Reply via email to