shibd commented on code in PR #15760:
URL: https://github.com/apache/pulsar/pull/15760#discussion_r882282106


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -751,45 +751,52 @@ protected void 
internalRevokePermissionsOnSubscription(String subscriptionName,
         }
     }
 
-    protected Set<String> internalGetNamespaceReplicationClusters() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.READ);
-
-        if (!namespaceName.isGlobal()) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Cannot get the replication clusters for a non-global 
namespace");
-        }
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.replication_clusters;
+    protected CompletableFuture<Set<String>> 
internalGetNamespaceReplicationClustersAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.READ)
+                .thenAccept(__ -> {
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot get the replication clusters for a 
non-global namespace");
+                    }
+                }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.replication_clusters);
     }
 
-    protected void internalSetNamespaceReplicationClusters(List<String> 
clusterIds) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        checkNotNull(clusterIds, "ClusterIds should not be null");
-
-        Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
-        if (!namespaceName.isGlobal()) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Cannot set 
replication on a non-global namespace");
-        }
-
-        if (replicationClusterSet.contains("global")) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Cannot specify global in the list of replication 
clusters");
-        }
-
-        Set<String> clusters = clusters();
-        for (String clusterId : replicationClusterSet) {
-            if (!clusters.contains(clusterId)) {
-                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: 
" + clusterId);
-            }
-            validatePeerClusterConflict(clusterId, replicationClusterSet);
-            validateClusterForTenant(namespaceName.getTenant(), clusterId);
-        }
-        updatePolicies(namespaceName, policies ->{
-            policies.replication_clusters = replicationClusterSet;
-            return policies;
-        });
+    @SuppressWarnings("checkstyle:WhitespaceAfter")
+    protected CompletableFuture<Void> 
internalSetNamespaceReplicationClusters(List<String> clusterIds) {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.REPLICATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenApply(__ -> {
+                    checkNotNull(clusterIds, "ClusterIds should not be null");
+                    if (!namespaceName.isGlobal()) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot set replication on a non-global 
namespace");
+                    }
+                    Set<String> replicationClusterSet = 
Sets.newHashSet(clusterIds);
+                    if (replicationClusterSet.contains("global")) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Cannot specify global in the list of 
replication clusters");
+                    }
+                    return replicationClusterSet;
+                }).thenCompose(replicationClusterSet -> clustersAsync()
+                        .thenCompose(clusters -> {
+                            List<CompletableFuture<Void>> futures =
+                                    
replicationClusterSet.stream().map(clusterId -> {
+                                        if (!clusters.contains(clusterId)) {
+                                            throw new 
RestException(Status.FORBIDDEN,
+                                                    "Invalid cluster id: " + 
clusterId);
+                                        }
+                                        return 
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
+                                                .thenCompose(__ ->
+                                                        
validateClusterForTenantAsync(
+                                                                
namespaceName.getTenant(), clusterId));
+                                    }).collect(Collectors.toList());
+                            return FutureUtil.waitForAll(futures);
+                        }).thenCompose(__ -> 
CompletableFuture.completedFuture(replicationClusterSet)))
+                .thenCompose(replicationClusterSet -> 
updatePoliciesAsync(namespaceName, policies -> {

Review Comment:
   Thank review, Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to