poorbarcode commented on code in PR #21810:
URL: https://github.com/apache/pulsar/pull/21810#discussion_r1440055261


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3061,39 +3062,67 @@ public CompletableFuture<Void> 
onPoliciesUpdate(@Nonnull Policies data) {
             return CompletableFuture.completedFuture(null);
         }
 
+        // Update props.
+        // The component "EntryFilters" is update in the method 
"updateTopicPolicyByNamespacePolicy(data)".
+        //   see more detail: https://github.com/apache/pulsar/pull/19364.
         updateTopicPolicyByNamespacePolicy(data);
         checkReplicatedSubscriptionControllerState();
         isEncryptionRequired = data.encryption_required;
-
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
 
-        updateDispatchRateLimiter();
-
-        updateSubscribeRateLimiter();
-
-        updatePublishDispatcher();
-
-        updateResourceGroupLimiter(data);
+        // Apply policies for components.
+        return FutureUtil.waitForAll(applyUpdatedPolicies(data))
+            .thenAccept(__ -> log.info("[{}] namespace-level policies updated 
successfully", topic))
+            .exceptionally(ex -> {
+                log.error("[{}] update namespace polices : {} error", 
this.getName(), data, ex);
+                throw FutureUtil.wrapToCompletionException(ex);
+            });
+    }
 
-        List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
-        producers.values().forEach(producer -> producerCheckFutures.add(
-                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+    private List<CompletableFuture<Void>> applyUpdatedPolicies(Policies 
namespaceLevelPolicies) {
+        List<CompletableFuture<Void>> applyPoliciesFutureList = new 
ArrayList<>();
 
-        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> 
{
-            return 
updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
-                replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
-                shadowReplicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
-                checkMessageExpiry();
-                CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-                CompletableFuture<Void> dedupFuture = 
checkDeduplicationStatus();
-                CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-                return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture,
-                        preCreateSubscriptionForCompactionIfNeeded());
-            });
-        }).exceptionally(ex -> {
-            log.error("[{}] update namespace polices : {} error", 
this.getName(), data, ex);
-            throw FutureUtil.wrapToCompletionException(ex);
+        // Client permission check.
+        subscriptions.forEach((subName, sub) -> {
+            sub.getConsumers().forEach(consumer -> 
applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
         });
+        producers.values().forEach(producer -> applyPoliciesFutureList.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+        // Check message expiry.
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> checkMessageExpiry(), MoreExecutors.directExecutor()));
+
+        // Update rate limiters.
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> updateDispatchRateLimiter(), 
MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> updateSubscribeRateLimiter(), 
MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> updatePublishRateLimiter(), 
MoreExecutors.directExecutor()));
+        if (namespaceLevelPolicies != null) {
+            applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                    () -> updateResourceGroupLimiter(namespaceLevelPolicies), 
MoreExecutors.directExecutor()));
+        }
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> updateSubscriptionsDispatcherRateLimiter(), 
MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter()),
+                MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> shadowReplicators.forEach((name, replicator) -> 
replicator.updateRateLimiter()),
+                MoreExecutors.directExecutor()));
+
+        // Other components.
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> checkReplicationAndRetryOnFailure(), 
MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> checkDeduplicationStatus(), 
MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> checkPersistencePolicies(), 
MoreExecutors.directExecutor()));
+        applyPoliciesFutureList.add(CompletableFuture.runAsync(
+                () -> preCreateSubscriptionForCompactionIfNeeded(), 
MoreExecutors.directExecutor()));
+
+        return applyPoliciesFutureList;

Review Comment:
   Sure, added a tool method in `FutureUtil`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to