This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new 37aca83 Avoid potentially blocking calls to metadata on critical threads (#12339) 37aca83 is described below commit 37aca83588ae90c93694fc2afc7349f24bcb8c88 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Oct 14 12:33:17 2021 -0700 Avoid potentially blocking calls to metadata on critical threads (#12339) * Avoid potentially blocking calls to metadata on critical threads * Fixed log arguments order * Addressed comments * Fixed mock in PersistentSubscriptionTest * Fixed issue in mocked tests * Fixed test that was force policies modification under the hood --- .../authorization/PulsarAuthorizationProvider.java | 67 +++++++++++----------- .../pulsar/broker/service/AbstractTopic.java | 60 +++++++++---------- .../pulsar/broker/service/BrokerService.java | 35 ++++------- .../service/persistent/DispatchRateLimiter.java | 7 +-- .../persistent/PersistentSubscriptionTest.java | 9 ++- 5 files changed, 78 insertions(+), 100 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 005707f..e355b12 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import com.google.common.base.Function; import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -46,6 +47,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,48 +324,43 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true); } - private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles, - boolean remove) { - CompletableFuture<Void> result = new CompletableFuture<>(); - + private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, + Set<String> roles, + boolean remove) { try { validatePoliciesReadOnlyAccess(); } catch (Exception e) { - result.completeExceptionally(e); + return FutureUtil.failedFuture(e); } - try { - Policies policies = pulsarResources.getNamespaceResources().getPolicies(namespace) - .orElseThrow(() -> new NotFoundException(namespace + " not found")); - if (remove) { - if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) { - policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles); - }else { - log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles); - result.completeExceptionally(new IllegalArgumentException("couldn't find subscription")); - return result; - } - } else { - policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles); - } - pulsarResources.getNamespaceResources().setPolicies(namespace, (data)->policies); + CompletableFuture<Void> future = + pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> { + if (remove) { + Set<String> subscriptionAuth = + policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName); + if (subscriptionAuth != null) { + subscriptionAuth.removeAll(roles); + } else { + log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, + roles, subscriptionName); + throw new IllegalArgumentException("couldn't find subscription"); + } + } else { + policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles); + } + return policies; + }).thenRun(() -> { + log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, + roles); + }); - log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles); - result.complete(null); - } catch (NotFoundException e) { - log.warn("[{}] Failed to set permissions for namespace {}: does not exist", subscriptionName, namespace); - result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace)); - } catch (BadVersionException e) { - log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace); - result.completeExceptionally(new IllegalStateException( - "Concurrent modification on metadata path: " + namespace + ", " + e.getMessage())); - } catch (Exception e) { - log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, e); - result.completeExceptionally( - new IllegalStateException("Failed to get permissions for namespace " + namespace)); - } + future.exceptionally(ex -> { + log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, + ex); + return null; + }); - return result; + return future; } private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 1e5710b..951d602 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -146,31 +146,18 @@ public abstract class AbstractTopic implements Topic { this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration() .getMaxMessageSizeCheckIntervalInSeconds()); this.lastActive = System.nanoTime(); - Policies policies = null; - try { - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - } catch (Exception e) { - log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); - } this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); - updatePublishDispatcher(policies); + updatePublishDispatcher(Optional.empty()); } protected boolean isProducersExceeded() { Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null); if (maxProducers == null) { - Policies policies; - try { - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - } catch (Exception e) { - policies = new Policies(); - } + Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources() + .getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() @@ -208,21 +195,12 @@ public abstract class AbstractTopic implements Topic { protected boolean isConsumersExceededOnTopic() { Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null); if (maxConsumers == null) { - Policies policies; - try { - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - if (policies == null) { - policies = new Policies(); - } - } catch (Exception e) { - log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic, - e.getMessage()); - policies = new Policies(); - } + // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks + Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); + maxConsumers = policies.max_consumers_per_topic; } final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers @@ -789,10 +767,10 @@ public abstract class AbstractTopic implements Topic { } public void updateMaxPublishRate(Policies policies) { - updatePublishDispatcher(policies); + updatePublishDispatcher(Optional.of(policies)); } - private void updatePublishDispatcher(Policies policies) { + private void updatePublishDispatcher(Optional<Policies> optPolicies) { //if topic-level policy exists, try to use topic-level publish rate policy Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate); if (topicPublishRate.isPresent()) { @@ -802,9 +780,23 @@ public abstract class AbstractTopic implements Topic { return; } + Policies policies; + try { + if (optPolicies.isPresent()) { + policies = optPolicies.get(); + } else { + policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); + } + } catch (Exception e) { + log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); + policies = new Policies(); + } + //topic-level policy is not set, try to use namespace-level rate policy final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); - final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null + final PublishRate publishRate = policies.publishMaxMessageRate != null ? policies.publishMaxMessageRate.get(clusterName) : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bd3c1e9..71c64c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2535,18 +2535,12 @@ public class BrokerService implements Closeable { } private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { - try { - Optional<Policies> policies = - pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject()); - // If namespace policies have the field set, it will override the broker-level setting - if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { - return policies.get().autoTopicCreationOverride; - } - } catch (Throwable t) { - // Ignoring since if we don't have policies, we fallback on the default - log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", - topicName, t.getMessage(), t); - return null; + Optional<Policies> policies = + pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesIfCached(topicName.getNamespaceObject()); + // If namespace policies have the field set, it will override the broker-level setting + if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { + return policies.get().autoTopicCreationOverride; } log.debug("No autoTopicCreateOverride policy found for {}", topicName); return null; @@ -2568,18 +2562,11 @@ public class BrokerService implements Closeable { } private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { - try { - Optional<Policies> policies = - pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject()); - // If namespace policies have the field set, it will override the broker-level setting - if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { - return policies.get().autoSubscriptionCreationOverride; - } - } catch (Throwable t) { - // Ignoring since if we don't have policies, we fallback on the default - log.warn("Got exception when reading autoSubscriptionCreateOverride policy for {}: {};", - topicName, t.getMessage(), t); - return null; + Optional<Policies> policies = + pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject()); + // If namespace policies have the field set, it will override the broker-level setting + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride; } log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 696995d..47bb638 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -320,12 +320,7 @@ public class DispatchRateLimiter { public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); - try { - return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace); - } catch (Exception e) { - log.warn("Failed to get message-rate for {} ", topicName, e); - return Optional.empty(); - } + return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 8be9b81..76f485e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -120,7 +121,13 @@ public class PersistentSubscriptionTest { svcConfig.setBrokerShutdownTimeoutMs(0L); svcConfig.setTransactionCoordinatorEnabled(true); pulsarMock = spy(new PulsarService(svcConfig)); - doReturn(mock(PulsarResources.class)).when(pulsarMock).getPulsarResources(); + PulsarResources pulsarResources = mock(PulsarResources.class); + doReturn(pulsarResources).when(pulsarMock).getPulsarResources(); + NamespaceResources namespaceResources = mock(NamespaceResources.class); + doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); + + doReturn(Optional.of(new Policies())).when(namespaceResources).getPoliciesIfCached(any()); + doReturn(new InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider(); doReturn(new TransactionPendingAckStoreProvider() { @Override