This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new c26c62f [Branch 2.7] Avoid potentially blocking calls to metadata on
critical threads (#12339) (#12340)
c26c62f is described below
commit c26c62fa804a559646d0cecd750b79f1e4713ceb
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 13 09:09:29 2021 -0700
[Branch 2.7] Avoid potentially blocking calls to metadata on critical
threads (#12339) (#12340)
* Avoid potentially blocking calls to metadata on critical threads (#12339)
* Fixed NPE
* Addressed comments
* Fixed issue with mocked tests
* Fixed behavior in BacklogQuotaManager to be like before
* Fixed AuthorizationProducerConsumerTest
* Fixed PersistentTopicTest
* Fixed PersistentTopicTest
---
.../authorization/PulsarAuthorizationProvider.java | 4 +--
.../pulsar/broker/lookup/TopicLookupBase.java | 31 +++---------------
.../pulsar/broker/service/AbstractTopic.java | 38 +++++++---------------
.../pulsar/broker/service/BacklogQuotaManager.java | 14 +++++---
.../pulsar/broker/service/BrokerService.java | 12 ++++---
.../apache/pulsar/broker/service/ServerCnx.java | 3 +-
.../service/persistent/DispatchRateLimiter.java | 11 +++----
.../broker/service/persistent/PersistentTopic.java | 23 +++++++++----
.../pulsar/broker/service/PersistentTopicTest.java | 10 ++++++
.../api/AuthorizationProducerConsumerTest.java | 4 +--
10 files changed, 71 insertions(+), 79 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 934d058..12c289f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -338,13 +338,13 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
}
private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName, Set<String> roles,
- boolean remove) {
+ boolean
remove) {
CompletableFuture<Void> result = new CompletableFuture<>();
-
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
result.completeExceptionally(e);
+ return result;
}
ZooKeeper globalZk = configCache.getZooKeeper();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 19ddce75..67b3268 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -163,37 +163,14 @@ public class TopicLookupBase extends PulsarWebResource {
* @param authoritative
* @param clientAppId
* @param requestId
- * @return
- */
- public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService
pulsarService, TopicName topicName,
- boolean authoritative, String clientAppId,
AuthenticationDataSource authenticationData, long requestId) {
- return lookupTopicAsync(pulsarService, topicName, authoritative,
clientAppId, authenticationData, requestId, null);
- }
-
- /**
- *
- * Lookup broker-service address for a given namespace-bundle which
contains given topic.
- *
- * a. Returns broker-address if namespace-bundle is already owned by any
broker
- * b. If current-broker receives lookup-request and if it's not a leader
then current broker redirects request
- * to leader by returning leader-service address.
- * c. If current-broker is leader then it finds out least-loaded broker to
own namespace bundle and redirects request
- * by returning least-loaded broker.
- * d. If current-broker receives request to own the namespace-bundle then
it owns a bundle and returns success(connect)
- * response to client.
- *
- * @param pulsarService
- * @param topicName
- * @param authoritative
- * @param clientAppId
- * @param requestId
* @param advertisedListenerName
* @return
*/
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService
pulsarService, TopicName topicName,
boolean
authoritative, String clientAppId,
AuthenticationDataSource authenticationData, long requestId,
- final String
advertisedListenerName) {
+ final String
advertisedListenerName,
+ boolean
isAlreadyAuthorized) {
final CompletableFuture<ByteBuf> validationFuture = new
CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new
CompletableFuture<>();
@@ -213,7 +190,9 @@ public class TopicLookupBase extends PulsarWebResource {
} else {
// (2) authorize client
try {
- checkAuthorization(pulsarService, topicName, clientAppId,
authenticationData);
+ if (!isAlreadyAuthorized) {
+ checkAuthorization(pulsarService, topicName,
clientAppId, authenticationData);
+ }
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}",
clientAppId, topicName.toString());
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 23b2221..339e89d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -114,13 +114,13 @@ public abstract class AbstractTopic implements Topic {
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
- Policies policies = null;
+ Policies policies;
try {
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
- .orElseGet(() -> new Policies());
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
+ policies = new Policies();
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
@@ -135,15 +135,11 @@ public abstract class AbstractTopic implements Topic {
}
if (maxProducers == null) {
- Policies policies;
- try {
- policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
- .orElseGet(() -> new Policies());
- } catch (Exception e) {
- policies = new Policies();
+ Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
+ if (policies != null) {
+ maxProducers = policies.max_producers_per_topic;
}
- maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers :
brokerService.pulsar()
.getConfiguration().getMaxProducersPerTopic();
@@ -160,22 +156,12 @@ public abstract class AbstractTopic implements Topic {
maxConsumers = topicPolicies.getMaxConsumerPerTopic();
}
if (maxConsumers == null) {
- Policies policies;
- try {
- // Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
- policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
-
- if (policies == null) {
- policies = new Policies();
- }
- } catch (Exception e) {
- log.warn("[{}] Failed to get namespace policies that include
max number of consumers: {}", topic,
- e.getMessage());
- policies = new Policies();
- }
- maxConsumers = policies.max_consumers_per_topic;
+ // Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
+ Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
+ maxConsumers = policies != null ? policies.max_consumers_per_topic
: 0;
}
+
final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers
:
brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <=
getNumberOfConsumers()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 020efa1..63e3e98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -65,13 +65,17 @@ public class BacklogQuotaManager {
}
public BacklogQuota getBacklogQuota(String namespace, String policyPath) {
+ Policies policies = null;
try {
- return zkCache.get(policyPath)
- .map(p ->
p.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage,
defaultQuota))
- .orElse(defaultQuota);
+ policies = zkCache.getDataIfPresent(policyPath);
} catch (Exception e) {
- log.warn("Failed to read policies data, will apply the default
backlog quota: namespace={}", namespace, e);
- return this.defaultQuota;
+ log.warn("Failed to check policies for path {}: {}", policyPath,
e);
+ }
+
+ if (policies != null) {
+ return
policies.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage,
defaultQuota);
+ } else {
+ return defaultQuota;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b7eca90..d9c015e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2414,8 +2414,10 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private AutoTopicCreationOverride getAutoTopicCreationOverride(final
TopicName topicName) {
try {
- Optional<Policies> policies =
pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
topicName.getNamespace()));
+ Optional<Policies> policies =
+
Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent(
+ AdminResource.path(POLICIES,
topicName.getNamespace().toString()))
+ );
// If namespace policies have the field set, it will override the
broker-level setting
if (policies.isPresent() &&
policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
@@ -2445,8 +2447,10 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private AutoSubscriptionCreationOverride
getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
- Optional<Policies> policies =
pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
topicName.getNamespace()));
+ Optional<Policies> policies =
+
Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent(
+ AdminResource.path(POLICIES,
topicName.getNamespace().toString()))
+ );
// If namespace policies have the field set, it will override the
broker-level setting
if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fa67aa4..22e2d28 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -378,7 +378,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName,
authoritative,
getPrincipal(), getAuthenticationData(),
- requestId,
advertisedListenerName).handle((lookupResponse, ex) -> {
+ requestId, advertisedListenerName,
+ true /* isAlreadyAuthorized
*/).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b2447a2..1ddb0a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -18,9 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -28,6 +25,7 @@ import java.util.function.Supplier;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -171,7 +169,7 @@ public class DispatchRateLimiter {
return true;
}
- policies = policies.isPresent() ? policies :
getPolicies(brokerService, topicName);
+ policies = policies.isPresent() ? policies :
getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}
@@ -302,13 +300,12 @@ public class DispatchRateLimiter {
public static Optional<Policies> getPolicies(BrokerService brokerService,
String topicName) {
final NamespaceName namespace =
TopicName.get(topicName).getNamespaceObject();
- final String path = path(POLICIES, namespace.toString());
Optional<Policies> policies = Optional.empty();
try {
ConfigurationCacheService configurationCacheService =
brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
- policies =
configurationCacheService.policiesCache().getAsync(path)
-
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
SECONDS);
+ return
Optional.ofNullable(configurationCacheService.policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
namespace.toString())));
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c40d4cc..d76fb28 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -518,8 +518,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// read repl-cluster from policies to avoid restart of replicator
which are in process of disconnect and close
try {
Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
- .orElseThrow(() -> new KeeperException.NoNodeException());
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
+ if (policies == null) {
+ throw new KeeperException.NoNodeException();
+ }
+
if (policies.replication_clusters != null) {
Set<String> configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
replicators.forEach((region, replicator) -> {
@@ -2010,8 +2013,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
TopicName name = TopicName.get(topic);
try {
Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, name.getNamespace()))
- .orElseThrow(() -> new KeeperException.NoNodeException());
+ .getDataIfPresent(AdminResource.path(POLICIES,
name.getNamespace()));
+ if (policies == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Error getting policies", topic);
+ }
+ return;
+ }
+
final int defaultExpirationTime =
brokerService.pulsar().getConfiguration()
.getSubscriptionExpirationTimeMinutes();
final long expirationTimeMillis = TimeUnit.MINUTES
@@ -2457,8 +2466,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, name.getNamespace()))
- .orElseThrow(KeeperException.NoNodeException::new);
+ .getDataIfPresent(AdminResource.path(POLICIES,
name.getNamespace()));
+ if (policies == null) {
+ throw new KeeperException.NoNodeException();
+ }
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 41c15c7..2537c99 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -517,6 +517,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
when(pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES,
TopicName.get(successTopicName).getNamespace())))
.thenReturn(Optional.of(policies));
+ when(pulsar.getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(successTopicName).getNamespace())))
+ .thenReturn(policies);
testMaxProducers();
}
@@ -1439,6 +1442,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
when(pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES,
TopicName.get(globalTopicName).getNamespace())))
.thenReturn(Optional.of(new Policies()));
+ when(pulsar.getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(globalTopicName).getNamespace())))
+ .thenReturn(new Policies());
// try to start replicator again
topic.startReplProducers();
// verify: replicator.startProducer is not invoked
@@ -1748,6 +1754,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.get(AdminResource.path(POLICIES,
TopicName.get(successTopicName).getNamespace())))
.thenReturn(Optional.of(new Policies()));
+ when(pulsar.getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(successTopicName).getNamespace())))
+ .thenReturn(new Policies());
+
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes();
doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index d43f657..2e071af 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -542,13 +542,13 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(
TopicName topic, String role, TopicOperation operation,
AuthenticationDataSource authData) {
- return CompletableFuture.completedFuture(true);
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public Boolean allowTopicOperation(
TopicName topicName, String role, TopicOperation operation,
AuthenticationDataSource authData) {
- return true;
+ return clientAuthProviderSupportedRoles.contains(role);
}
}