poorbarcode commented on code in PR #21810: URL: https://github.com/apache/pulsar/pull/21810#discussion_r1437525415
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ########## @@ -3061,39 +3062,67 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) { return CompletableFuture.completedFuture(null); } + // Update props. + // The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)". + // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicyByNamespacePolicy(data); checkReplicatedSubscriptionControllerState(); isEncryptionRequired = data.encryption_required; - isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; - updateDispatchRateLimiter(); - - updateSubscribeRateLimiter(); - - updatePublishDispatcher(); - - updateResourceGroupLimiter(data); + // Update components. + return FutureUtil.waitForAll(updateComponentPolicies(data)) + .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) + .exceptionally(ex -> { + log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); + throw FutureUtil.wrapToCompletionException(ex); + }); + } - List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size()); - producers.values().forEach(producer -> producerCheckFutures.add( - producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); + private List<CompletableFuture<Void>> updateComponentPolicies(Policies namespacePolicies) { + List<CompletableFuture<Void>> updateComponentsFutureList = new ArrayList<>(); - return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> { - return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> { - replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - checkMessageExpiry(); - CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure(); - CompletableFuture<Void> dedupFuture = checkDeduplicationStatus(); - CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies(); - return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture, - preCreateSubscriptionForCompactionIfNeeded()); - }); - }).exceptionally(ex -> { - log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); - throw FutureUtil.wrapToCompletionException(ex); + // Client permission check. + subscriptions.forEach((subName, sub) -> { + sub.getConsumers().forEach(consumer -> updateComponentsFutureList.add(consumer.checkPermissionsAsync())); }); + producers.values().forEach(producer -> updateComponentsFutureList.add( + producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); + // Check message expiry. + updateComponentsFutureList.add(CompletableFuture.runAsync( + () -> checkMessageExpiry(), MoreExecutors.directExecutor())); + + // Update rate limiters. + updateComponentsFutureList.add(CompletableFuture.runAsync( + () -> updateDispatchRateLimiter(), MoreExecutors.directExecutor())); + updateComponentsFutureList.add(CompletableFuture.runAsync( + () -> updateSubscribeRateLimiter(), MoreExecutors.directExecutor())); + updateComponentsFutureList.add(CompletableFuture.runAsync( + () -> updatePublishRateLimiter(), MoreExecutors.directExecutor())); + if (namespacePolicies != null) { + updateComponentsFutureList.add(CompletableFuture.runAsync( + () -> updateResourceGroupLimiter(namespacePolicies), MoreExecutors.directExecutor())); + } Review Comment: > Oh, sorry. I thought it was able to get from somewhere before. Pass via the params is better. :) Sure, reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org