poorbarcode commented on code in PR #21810:
URL: https://github.com/apache/pulsar/pull/21810#discussion_r1437525415


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3061,39 +3062,67 @@ public CompletableFuture<Void> 
onPoliciesUpdate(@Nonnull Policies data) {
             return CompletableFuture.completedFuture(null);
         }
 
+        // Update props.
+        // The component "EntryFilters" is update in the method 
"updateTopicPolicyByNamespacePolicy(data)".
+        //   see more detail: https://github.com/apache/pulsar/pull/19364.
         updateTopicPolicyByNamespacePolicy(data);
         checkReplicatedSubscriptionControllerState();
         isEncryptionRequired = data.encryption_required;
-
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
 
-        updateDispatchRateLimiter();
-
-        updateSubscribeRateLimiter();
-
-        updatePublishDispatcher();
-
-        updateResourceGroupLimiter(data);
+        // Update components.
+        return FutureUtil.waitForAll(updateComponentPolicies(data))
+            .thenAccept(__ -> log.info("[{}] namespace-level policies updated 
successfully", topic))
+            .exceptionally(ex -> {
+                log.error("[{}] update namespace polices : {} error", 
this.getName(), data, ex);
+                throw FutureUtil.wrapToCompletionException(ex);
+            });
+    }
 
-        List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
-        producers.values().forEach(producer -> producerCheckFutures.add(
-                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+    private List<CompletableFuture<Void>> updateComponentPolicies(Policies 
namespacePolicies) {
+        List<CompletableFuture<Void>> updateComponentsFutureList = new 
ArrayList<>();
 
-        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> 
{
-            return 
updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
-                replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
-                shadowReplicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
-                checkMessageExpiry();
-                CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-                CompletableFuture<Void> dedupFuture = 
checkDeduplicationStatus();
-                CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-                return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture,
-                        preCreateSubscriptionForCompactionIfNeeded());
-            });
-        }).exceptionally(ex -> {
-            log.error("[{}] update namespace polices : {} error", 
this.getName(), data, ex);
-            throw FutureUtil.wrapToCompletionException(ex);
+        // Client permission check.
+        subscriptions.forEach((subName, sub) -> {
+            sub.getConsumers().forEach(consumer -> 
updateComponentsFutureList.add(consumer.checkPermissionsAsync()));
         });
+        producers.values().forEach(producer -> updateComponentsFutureList.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+        // Check message expiry.
+        updateComponentsFutureList.add(CompletableFuture.runAsync(
+                () -> checkMessageExpiry(), MoreExecutors.directExecutor()));
+
+        // Update rate limiters.
+        updateComponentsFutureList.add(CompletableFuture.runAsync(
+                () -> updateDispatchRateLimiter(), 
MoreExecutors.directExecutor()));
+        updateComponentsFutureList.add(CompletableFuture.runAsync(
+                () -> updateSubscribeRateLimiter(), 
MoreExecutors.directExecutor()));
+        updateComponentsFutureList.add(CompletableFuture.runAsync(
+                () -> updatePublishRateLimiter(), 
MoreExecutors.directExecutor()));
+        if (namespacePolicies != null) {
+            updateComponentsFutureList.add(CompletableFuture.runAsync(
+                    () -> updateResourceGroupLimiter(namespacePolicies), 
MoreExecutors.directExecutor()));
+        }

Review Comment:
   > Oh, sorry. I thought it was able to get from somewhere before.
   Pass via the params is better. :)
   
   Sure, reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to