Technoboy- commented on code in PR #15760:
URL: https://github.com/apache/pulsar/pull/15760#discussion_r881704437
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -751,45 +751,52 @@ protected void
internalRevokePermissionsOnSubscription(String subscriptionName,
}
}
- protected Set<String> internalGetNamespaceReplicationClusters() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.REPLICATION, PolicyOperation.READ);
-
- if (!namespaceName.isGlobal()) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "Cannot get the replication clusters for a non-global
namespace");
- }
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.replication_clusters;
+ protected CompletableFuture<Set<String>>
internalGetNamespaceReplicationClustersAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATION, PolicyOperation.READ)
+ .thenAccept(__ -> {
+ if (!namespaceName.isGlobal()) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot get the replication clusters for a
non-global namespace");
+ }
+ }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.replication_clusters);
}
- protected void internalSetNamespaceReplicationClusters(List<String>
clusterIds) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.REPLICATION, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- checkNotNull(clusterIds, "ClusterIds should not be null");
-
- Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
- if (!namespaceName.isGlobal()) {
- throw new RestException(Status.PRECONDITION_FAILED, "Cannot set
replication on a non-global namespace");
- }
-
- if (replicationClusterSet.contains("global")) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "Cannot specify global in the list of replication
clusters");
- }
-
- Set<String> clusters = clusters();
- for (String clusterId : replicationClusterSet) {
- if (!clusters.contains(clusterId)) {
- throw new RestException(Status.FORBIDDEN, "Invalid cluster id:
" + clusterId);
- }
- validatePeerClusterConflict(clusterId, replicationClusterSet);
- validateClusterForTenant(namespaceName.getTenant(), clusterId);
- }
- updatePolicies(namespaceName, policies ->{
- policies.replication_clusters = replicationClusterSet;
- return policies;
- });
+ @SuppressWarnings("checkstyle:WhitespaceAfter")
+ protected CompletableFuture<Void>
internalSetNamespaceReplicationClusters(List<String> clusterIds) {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenApply(__ -> {
+ checkNotNull(clusterIds, "ClusterIds should not be null");
+ if (!namespaceName.isGlobal()) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot set replication on a non-global
namespace");
+ }
+ Set<String> replicationClusterSet =
Sets.newHashSet(clusterIds);
+ if (replicationClusterSet.contains("global")) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot specify global in the list of
replication clusters");
+ }
+ return replicationClusterSet;
+ }).thenCompose(replicationClusterSet -> clustersAsync()
+ .thenCompose(clusters -> {
+ List<CompletableFuture<Void>> futures =
+
replicationClusterSet.stream().map(clusterId -> {
+ if (!clusters.contains(clusterId)) {
+ throw new
RestException(Status.FORBIDDEN,
+ "Invalid cluster id: " +
clusterId);
+ }
+ return
validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
+ .thenCompose(__ ->
+
validateClusterForTenantAsync(
+
namespaceName.getTenant(), clusterId));
+ }).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures);
+ }).thenCompose(__ ->
CompletableFuture.completedFuture(replicationClusterSet)))
+ .thenCompose(replicationClusterSet ->
updatePoliciesAsync(namespaceName, policies -> {
Review Comment:
`CompletableFuture.completedFuture(replicationClusterSet)))` is redundant.
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]