poorbarcode commented on code in PR #21810: URL: https://github.com/apache/pulsar/pull/21810#discussion_r1440055261
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ########## @@ -3061,39 +3062,67 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) { return CompletableFuture.completedFuture(null); } + // Update props. + // The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)". + // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicyByNamespacePolicy(data); checkReplicatedSubscriptionControllerState(); isEncryptionRequired = data.encryption_required; - isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; - updateDispatchRateLimiter(); - - updateSubscribeRateLimiter(); - - updatePublishDispatcher(); - - updateResourceGroupLimiter(data); + // Apply policies for components. + return FutureUtil.waitForAll(applyUpdatedPolicies(data)) + .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) + .exceptionally(ex -> { + log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); + throw FutureUtil.wrapToCompletionException(ex); + }); + } - List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size()); - producers.values().forEach(producer -> producerCheckFutures.add( - producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); + private List<CompletableFuture<Void>> applyUpdatedPolicies(Policies namespaceLevelPolicies) { + List<CompletableFuture<Void>> applyPoliciesFutureList = new ArrayList<>(); - return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> { - return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> { - replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - checkMessageExpiry(); - CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure(); - CompletableFuture<Void> dedupFuture = checkDeduplicationStatus(); - CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies(); - return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture, - preCreateSubscriptionForCompactionIfNeeded()); - }); - }).exceptionally(ex -> { - log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); - throw FutureUtil.wrapToCompletionException(ex); + // Client permission check. + subscriptions.forEach((subName, sub) -> { + sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync())); }); + producers.values().forEach(producer -> applyPoliciesFutureList.add( + producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); + // Check message expiry. + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> checkMessageExpiry(), MoreExecutors.directExecutor())); + + // Update rate limiters. + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> updateDispatchRateLimiter(), MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> updateSubscribeRateLimiter(), MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> updatePublishRateLimiter(), MoreExecutors.directExecutor())); + if (namespaceLevelPolicies != null) { + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> updateResourceGroupLimiter(namespaceLevelPolicies), MoreExecutors.directExecutor())); + } + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> updateSubscriptionsDispatcherRateLimiter(), MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> replicators.forEach((name, replicator) -> replicator.updateRateLimiter()), + MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()), + MoreExecutors.directExecutor())); + + // Other components. + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> checkReplicationAndRetryOnFailure(), MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> checkDeduplicationStatus(), MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> checkPersistencePolicies(), MoreExecutors.directExecutor())); + applyPoliciesFutureList.add(CompletableFuture.runAsync( + () -> preCreateSubscriptionForCompactionIfNeeded(), MoreExecutors.directExecutor())); + + return applyPoliciesFutureList; Review Comment: Sure, added a tool method in `FutureUtil` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org