This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e041fab73c1 [fix][admin] Refactor namespace anti affinity group sync 
operations to async in rest api (#25086)
e041fab73c1 is described below

commit e041fab73c1b9a6bb93457d186fc0a937beafd53
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Dec 17 19:16:20 2025 +0800

    [fix][admin] Refactor namespace anti affinity group sync operations to 
async in rest api (#25086)
    
    Co-authored-by: oneby-wang <[email protected]>
---
 .../broker/resources/LocalPoliciesResources.java   |   6 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |  17 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 186 +++++++--------------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  55 ++++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  58 +++++--
 .../apache/pulsar/broker/admin/NamespacesTest.java |  80 +++++++++
 .../pulsar/broker/admin/NamespacesV2Test.java      |  95 ++++++++++-
 7 files changed, 332 insertions(+), 165 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index b7ef19ccbe8..8e7b0ab0b1e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -37,9 +37,9 @@ public class LocalPoliciesResources extends 
BaseResources<LocalPolicies> {
         super(localStore, LocalPolicies.class, operationTimeoutSec);
     }
 
-    public void setLocalPolicies(NamespaceName ns, Function<LocalPolicies, 
LocalPolicies> modifyFunction)
-            throws MetadataStoreException {
-        set(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction);
+    public CompletableFuture<Void> setLocalPoliciesAsync(NamespaceName ns,
+                                                         
Function<LocalPolicies, LocalPolicies> modifyFunction) {
+        return setAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), 
modifyFunction);
     }
 
     public Optional<LocalPolicies> getLocalPolicies(NamespaceName ns) throws 
MetadataStoreException{
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index bfa1fdc812b..9ea5b4c33cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
@@ -498,14 +499,12 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 });
     }
 
-   protected void validateClusterExists(String cluster) {
-        try {
-            if (!clusterResources().getCluster(cluster).isPresent()) {
+    protected CompletableFuture<Void> validateClusterExistsAsync(String 
cluster) {
+        return 
clusterResources().clusterExistsAsync(cluster).thenAccept(clusterExist -> {
+            if (!clusterExist) {
                 throw new RestException(Status.PRECONDITION_FAILED, "Cluster " 
+ cluster + " does not exist.");
             }
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
+        });
     }
 
     protected Policies getNamespacePolicies(String tenant, String cluster, 
String namespace) {
@@ -874,6 +873,12 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    protected void checkNotBlank(String str, String errorMessage) {
+        if (isBlank(str)) {
+            throw new RestException(Status.PRECONDITION_FAILED, errorMessage);
+        }
+    }
+
     protected boolean isManagedLedgerNotFoundException(Throwable cause) {
         return cause instanceof 
ManagedLedgerException.MetadataNotFoundException
                 || cause instanceof MetadataStoreException.NotFoundException;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0cb4f9a493d..bfbd6523481 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
@@ -1038,35 +1037,6 @@ public abstract class NamespacesBase extends 
AdminResource {
         return internalSetBookieAffinityGroupAsync(null);
     }
 
-    @Deprecated
-    protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
-        validateSuperUserAccess();
-
-        if (namespaceName.isGlobal()) {
-            // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
-            validateGlobalNamespaceOwnership(namespaceName);
-        } else {
-            validateClusterOwnership(namespaceName.getCluster());
-            validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
-        }
-        try {
-            final BookieAffinityGroupData bookkeeperAffinityGroup = 
getLocalPolicies().getLocalPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                            "Namespace local-policies does not 
exist")).bookieAffinityGroup;
-            return bookkeeperAffinityGroup;
-        } catch (NotFoundException e) {
-            log.warn("[{}] Failed to get local-policy configuration for 
namespace {}: does not exist",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace policies does 
not exist");
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get local-policy configuration for 
namespace {}", clientAppId(),
-                    namespaceName, e);
-            throw new RestException(e);
-        }
-    }
-
     protected CompletableFuture<BookieAffinityGroupData> 
internalGetBookieAffinityGroupAsync() {
         return validateSuperUserAccessAsync().thenCompose(__ -> {
             if (namespaceName.isGlobal()) {
@@ -1077,9 +1047,8 @@ public abstract class NamespacesBase extends 
AdminResource {
                         unused -> 
validateClusterForTenantAsync(namespaceName.getTenant(), 
namespaceName.getCluster()));
             }
         }).thenCompose(__ -> 
getLocalPolicies().getLocalPoliciesAsync(namespaceName))
-                .thenApply(policies -> policies.orElseThrow(
-                        () -> new RestException(Status.NOT_FOUND, "Namespace 
local-policies does not exist"))
-                        .bookieAffinityGroup);
+                .thenApply(policies -> policies.orElseThrow(() -> new 
RestException(Status.NOT_FOUND,
+                        "Namespace local-policies does not 
exist")).bookieAffinityGroup);
     }
 
     private CompletableFuture<Void> validateLeaderBrokerAsync() {
@@ -1842,104 +1811,69 @@ public abstract class NamespacesBase extends 
AdminResource {
         internalSetPolicies("delayed_delivery_policies", 
delayedDeliveryPolicies);
     }
 
-    protected void internalSetNamespaceAntiAffinityGroup(String 
antiAffinityGroup) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
-        checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be 
null");
-        validatePoliciesReadOnlyAccess();
-
-        log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), 
antiAffinityGroup, namespaceName);
-
-        if (isBlank(antiAffinityGroup)) {
-            throw new RestException(Status.PRECONDITION_FAILED, 
"antiAffinityGroup can't be empty");
-        }
-
-        try {
-            getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
-                lp.map(policies -> new LocalPolicies(policies.bundles,
-                        policies.bookieAffinityGroup,
-                        antiAffinityGroup,
-                        policies.migrated))
-                        .orElseGet(() -> new 
LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
-            );
-            log.info("[{}] Successfully updated local-policies configuration: 
namespace={}, map={}", clientAppId(),
-                    namespaceName, antiAffinityGroup);
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to update local-policy configuration for 
namespace {}", clientAppId(), namespaceName,
-                    e);
-            throw new RestException(e);
-        }
-    }
-
-    protected String internalGetNamespaceAntiAffinityGroup() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
-
-        try {
-            return getLocalPolicies()
-                    .getLocalPolicies(namespaceName)
-                    .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles())
-                            , null, null)).namespaceAntiAffinityGroup;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get the antiAffinityGroup of namespace 
{}", clientAppId(), namespaceName, e);
-            throw new RestException(Status.NOT_FOUND, "Couldn't find namespace 
policies");
-        }
-    }
-
-    protected void internalRemoveNamespaceAntiAffinityGroup() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
-        log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), 
namespaceName);
-
-        try {
-            getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
-                new LocalPolicies(policies.bundles,
-                        policies.bookieAffinityGroup,
-                        null,
-                        policies.migrated));
-            log.info("[{}] Successfully removed anti-affinity group for a 
namespace={}", clientAppId(), namespaceName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to remove anti-affinity group for namespace 
{}", clientAppId(), namespaceName, e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> 
internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) {
+        checkNotNull(antiAffinityGroup, "Anti-affinity group should not be 
null");
+        checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
+                .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync()).thenCompose(
+                        __ -> getDefaultBundleDataAsync().thenCompose(
+                                defaultBundleData -> 
getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName,
+                                        oldPolicies -> 
oldPolicies.map(policies -> new LocalPolicies(policies.bundles,
+                                                        
policies.bookieAffinityGroup, antiAffinityGroup,
+                                                        policies.migrated))
+                                                .orElseGet(() -> new 
LocalPolicies(defaultBundleData, null,
+                                                        antiAffinityGroup)))))
+                .thenAccept(__ -> log.info(
+                        "[{}] Successfully updated namespace anti-affinity 
group, namespace={}, anti-affinity"
+                                + " group={}", clientAppId(), namespaceName, 
antiAffinityGroup));
+    }
+
+    protected CompletableFuture<String> 
internalGetNamespaceAntiAffinityGroupAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
+                .thenCompose(__ -> 
getLocalPolicies().getLocalPoliciesAsync(namespaceName)
+                .thenApply(policiesOpt -> policiesOpt.map(localPolicies -> 
localPolicies.namespaceAntiAffinityGroup)
+                        .orElse(null)));
+    }
+
+    protected CompletableFuture<Void> 
internalRemoveNamespaceAntiAffinityGroupAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> {
+                    log.info("[{}] Removing anti-affinity group for namespace: 
{}", clientAppId(), namespaceName);
+                    return 
getLocalPolicies().setLocalPoliciesAsync(namespaceName,
+                            (policies) -> new LocalPolicies(policies.bundles, 
policies.bookieAffinityGroup, null,
+                                    policies.migrated));
+                })
+                .thenAccept(__ -> log.info("[{}] Successfully removed 
anti-affinity group for namespace: {}",
+                        clientAppId(), namespaceName));
     }
 
-    protected List<String> internalGetAntiAffinityNamespaces(String cluster, 
String antiAffinityGroup,
-                                                             String tenant) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
+    protected CompletableFuture<List<String>> 
internalGetAntiAffinityNamespacesAsync(String cluster,
+                                                                               
      String antiAffinityGroup,
+                                                                               
      String tenant) {
         checkNotNull(cluster, "Cluster should not be null");
-        checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be 
null");
+        checkNotNull(antiAffinityGroup, "Anti-affinity group should not be 
null");
         checkNotNull(tenant, "Tenant should not be null");
+        checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
 
-        log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), 
tenant, antiAffinityGroup, cluster);
-
-        if (isBlank(antiAffinityGroup)) {
-            throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity 
group can't be empty.");
-        }
-        validateClusterExists(cluster);
-
-        try {
-            List<String> namespaces = 
tenantResources().getListOfNamespaces(tenant);
-
-            return namespaces.stream().filter(ns -> {
-                Optional<LocalPolicies> policies;
-                try {
-                    policies = 
getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-
-                String storedAntiAffinityGroup = policies.orElseGet(() ->
-                        new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
-                                null, null)).namespaceAntiAffinityGroup;
-                return 
antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup);
-            }).collect(Collectors.toList());
-
-        } catch (Exception e) {
-            log.warn("Failed to list of properties/namespace from global-zk", 
e);
-            throw new RestException(e);
-        }
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
+                .thenCompose(__ -> validateClusterExistsAsync(cluster))
+                .thenCompose(__ -> {
+                    log.info("[{}]-{} Finding namespaces for {} in {}", 
clientAppId(), tenant, antiAffinityGroup,
+                            cluster);
+                    return 
tenantResources().getListOfNamespacesAsync(tenant).thenCompose(namespaces -> {
+                        List<CompletableFuture<String>> nsFutures = 
namespaces.stream()
+                                .map(ns -> 
getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns))
+                                        .thenApply(policiesOpt -> 
policiesOpt.map(
+                                                localPolicies -> 
localPolicies.namespaceAntiAffinityGroup).orElse(null))
+                                        
.thenApply(antiAffinityGroup::equalsIgnoreCase)
+                                        .thenApply(equals -> equals ? ns : 
null)).toList();
+                        CompletableFuture<Void> allFuture = 
FutureUtil.waitForAll(nsFutures);
+                        return allFuture.thenApply(
+                                unused -> 
nsFutures.stream().map(CompletableFuture::join).filter(Objects::nonNull)
+                                        .toList());
+                    });
+                });
     }
 
     private boolean checkQuotas(Policies policies, RetentionPolicies 
retention) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c6b8dbf5d2d..cad6899c8a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -586,10 +586,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") })
-    public void setNamespaceAntiAffinityGroup(@PathParam("property") String 
property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace, String antiAffinityGroup) {
+    public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse 
asyncResponse,
+                                              @PathParam("property") String 
property,
+                                              @PathParam("cluster") String 
cluster,
+                                              @PathParam("namespace") String 
namespace, String antiAffinityGroup) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetNamespaceAntiAffinityGroup(antiAffinityGroup);
+        internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to set namespace anti-affinity 
group, tenant: {}, namespace: {}, "
+                            + "antiAffinityGroup: {}", clientAppId(), 
property, namespace, antiAffinityGroup, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -597,10 +606,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get anti-affinity group of a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
-    public String getNamespaceAntiAffinityGroup(@PathParam("property") String 
property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
+    public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse 
asyncResponse,
+                                              @PathParam("property") String 
property,
+                                              @PathParam("cluster") String 
cluster,
+                                              @PathParam("namespace") String 
namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetNamespaceAntiAffinityGroup();
+        internalGetNamespaceAntiAffinityGroupAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get namespace anti-affinity 
group, tenant: {}, namespace: {}",
+                            clientAppId(), property, namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -609,10 +627,19 @@ public class Namespaces extends NamespacesBase {
             + " api can be only accessed by admin of any of the existing 
property")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 412, message = "Cluster not 
exist/Anti-affinity group can't be empty.")})
-    public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String 
cluster,
+    public void getAntiAffinityNamespaces(@Suspended AsyncResponse 
asyncResponse,
+                                                  @PathParam("cluster") String 
cluster,
                                                   @PathParam("group") String 
antiAffinityGroup,
                                                   @QueryParam("property") 
String property) {
-        return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, 
property);
+        internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup, 
property)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get all namespaces in cluster of 
given anti-affinity group, cluster: {}, "
+                            + "tenant: {}, antiAffinityGroup: {}", 
clientAppId(), cluster, property, antiAffinityGroup,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -621,11 +648,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification")})
-    public void removeNamespaceAntiAffinityGroup(@PathParam("property") String 
property,
+    public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse 
asyncResponse,
+                                                 @PathParam("property") String 
property,
                                                  @PathParam("cluster") String 
cluster,
                                                  @PathParam("namespace") 
String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        internalRemoveNamespaceAntiAffinityGroup();
+        internalRemoveNamespaceAntiAffinityGroupAsync()
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to remove namespace anti-affinity 
group, tenant: {}, namespace: {}",
+                            clientAppId(), property, namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 62bf5cd7aea..90f4b087bfe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2138,13 +2138,21 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
             @ApiResponse(code = 412, message = "Invalid antiAffinityGroup")})
-    public void setNamespaceAntiAffinityGroup(@PathParam("tenant") String 
tenant,
+    public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse 
asyncResponse,
+                                              @PathParam("tenant") String 
tenant,
                                               @PathParam("namespace") String 
namespace,
                                               @ApiParam(value = "Anti-affinity 
group for the specified namespace",
                                                       required = true)
-                                                          String 
antiAffinityGroup) {
+                                              String antiAffinityGroup) {
         validateNamespaceName(tenant, namespace);
-        internalSetNamespaceAntiAffinityGroup(antiAffinityGroup);
+        internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to set namespace anti-affinity 
group, tenant: {}, namespace: {}, "
+                            + "antiAffinityGroup: {}", clientAppId(), tenant, 
namespace, antiAffinityGroup, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -2152,10 +2160,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get anti-affinity group of a namespace.", response 
= String.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public String getNamespaceAntiAffinityGroup(@PathParam("tenant") String 
tenant,
-            @PathParam("namespace") String namespace) {
+    public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetNamespaceAntiAffinityGroup();
+        internalGetNamespaceAntiAffinityGroupAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get namespace anti-affinity 
group, tenant: {}, namespace: {}",
+                            clientAppId(), tenant, namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -2166,10 +2182,18 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public void removeNamespaceAntiAffinityGroup(@PathParam("tenant") String 
tenant,
-            @PathParam("namespace") String namespace) {
+    public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse 
asyncResponse,
+                                                 @PathParam("tenant") String 
tenant,
+                                                 @PathParam("namespace") 
String namespace) {
         validateNamespaceName(tenant, namespace);
-        internalRemoveNamespaceAntiAffinityGroup();
+        internalRemoveNamespaceAntiAffinityGroupAsync()
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to remove namespace anti-affinity 
group, tenant: {}, namespace: {}",
+                            clientAppId(), tenant, namespace, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -2179,9 +2203,19 @@ public class Namespaces extends NamespacesBase {
             response = String.class, responseContainer = "List")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 412, message = "Cluster not 
exist/Anti-affinity group can't be empty.")})
-    public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String 
cluster,
-            @PathParam("group") String antiAffinityGroup, 
@QueryParam("tenant") String tenant) {
-        return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, 
tenant);
+    public void getAntiAffinityNamespaces(@Suspended AsyncResponse 
asyncResponse,
+                                                  @PathParam("cluster") String 
cluster,
+                                                  @PathParam("group") String 
antiAffinityGroup,
+                                                  @QueryParam("tenant") String 
tenant) {
+        internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup, 
tenant)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get all namespaces in cluster of 
given anti-affinity group, cluster: {}, "
+                            + "tenant: {}, antiAffinityGroup: {}", 
clientAppId(), cluster, tenant, antiAffinityGroup,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 2aaa5676d5b..2f5b99bfd9d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -2433,4 +2433,84 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                         setBookieAffinityGroupNs));
         assertNull(bookieAffinityGroupDataResp);
     }
+
+    @Test
+    public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception {
+        // 1. create namespace with empty policies, namespace anti affinity 
group should be null
+        String setNamespaceAntiAffinityGroupNs = 
"test-set-namespace-anti-affinity-group-ns";
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, testLocalCluster,
+                setNamespaceAntiAffinityGroupNs, (Policies) null));
+        String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant, testLocalCluster,
+                        setNamespaceAntiAffinityGroupNs));
+        assertNull(namespaceAntiAffinityGroupResp);
+
+        // 2.set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        asyncRequests(response -> 
namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster,
+                setNamespaceAntiAffinityGroupNs, 
namespaceAntiAffinityGroupReq));
+
+        // 3.assert namespace anti affinity group
+        namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant, testLocalCluster,
+                        setNamespaceAntiAffinityGroupNs));
+        assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+
+        // 4.delete namespace anti affinity group
+        asyncRequests(response -> 
namespaces.removeNamespaceAntiAffinityGroup(response, testTenant, 
testLocalCluster,
+                setNamespaceAntiAffinityGroupNs));
+
+        // 5.assert namespace anti affinity group
+        namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant, testLocalCluster,
+                        setNamespaceAntiAffinityGroupNs));
+        assertNull(namespaceAntiAffinityGroupResp);
+    }
+
+    @Test
+    public void testGetClusterAntiAffinityNamespaces() throws Exception {
+        // create 5 namespaces, 3 namespaces are set to the same namespace 
anti affinity group,
+        // 2 namespaces are not set to any anti affinity group
+        String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1";
+        String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2";
+        String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3";
+        String namespaceWithoutAntiAffinity1 = 
"namespace-without-anti-affinity-1";
+        String namespaceWithoutAntiAffinity2 = 
"namespace-without-anti-affinity-2";
+
+        // create namespaces
+        List<String> allNamespaces =
+                List.of(namespaceWithAntiAffinity1, 
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3,
+                        namespaceWithoutAntiAffinity1, 
namespaceWithoutAntiAffinity2);
+        for (String namespace : allNamespaces) {
+            asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, testLocalCluster, namespace,
+                    (Policies) null));
+        }
+
+        // set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        List<String> namespacesWithAntiAffinityGroup =
+                List.of(namespaceWithAntiAffinity1, 
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3);
+        for (String namespace : namespacesWithAntiAffinityGroup) {
+            asyncRequests(response -> 
namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster,
+                    namespace, namespaceAntiAffinityGroupReq));
+        }
+
+        // assert namespace anti affinity group
+        for (String namespace : namespacesWithAntiAffinityGroup) {
+            String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                    response -> 
namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster,
+                            namespace));
+            assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+        }
+
+        // get namespaces in cluster of given anti affinity group
+        List<String> namespacesResp = (List<String>) asyncRequests(
+                response -> namespaces.getAntiAffinityNamespaces(response, 
testLocalCluster,
+                        namespaceAntiAffinityGroupReq, testTenant));
+        List<String> namespacesWithFullPath =
+                namespacesWithAntiAffinityGroup.stream().map(ns -> 
NamespaceName.get(testTenant, testLocalCluster, ns))
+                        .map(NamespaceName::toString).toList();
+        assertEquals(namespacesResp, namespacesWithFullPath);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index 629e92c056f..596cfa3f396 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -305,8 +305,8 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
 
         // 2.set namespace anti affinity group
         String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
-        namespaces.setNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs,
-                namespaceAntiAffinityGroupReq);
+        asyncRequests(response -> 
namespaces.setNamespaceAntiAffinityGroup(response, testTenant,
+                setNamespaceAntiAffinityGroupNs, 
namespaceAntiAffinityGroupReq));
 
         // 3.query namespace num bundles, should be 
conf.getDefaultNumberOfNamespaceBundles()
         BundlesData bundlesData = (BundlesData) asyncRequests(
@@ -314,8 +314,9 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
         assertEquals(bundlesData.getNumBundles(), 
conf.getDefaultNumberOfNamespaceBundles());
 
         // 4.assert namespace anti affinity group
-        String namespaceAntiAffinityGroupResp =
-                namespaces.getNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs);
+        String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant,
+                        setNamespaceAntiAffinityGroupNs));
         assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
     }
 
@@ -330,8 +331,8 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
 
         // 2.set namespace anti affinity group
         String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
-        namespaces.setNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs,
-                namespaceAntiAffinityGroupReq);
+        asyncRequests(response -> 
namespaces.setNamespaceAntiAffinityGroup(response, testTenant,
+                setNamespaceAntiAffinityGroupNs, 
namespaceAntiAffinityGroupReq));
 
         // 3.query namespace num bundles, should be policies.bundles, which we 
set before
         BundlesData bundlesData = (BundlesData) asyncRequests(
@@ -339,8 +340,9 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
         assertEquals(bundlesData, policies.bundles);
 
         // 4.assert namespace anti affinity group
-        String namespaceAntiAffinityGroupResp =
-                namespaces.getNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs);
+        String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant,
+                        setNamespaceAntiAffinityGroupNs));
         assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
     }
 
@@ -418,4 +420,81 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
         assertNull(bookieAffinityGroupDataResp);
     }
 
+    @Test
+    public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception {
+        // 1. create namespace with empty policies, namespace anti affinity 
group should be null
+        String setNamespaceAntiAffinityGroupNs = 
"test-set-namespace-anti-affinity-group-ns";
+        asyncRequests(
+                response -> namespaces.createNamespace(response, testTenant, 
setNamespaceAntiAffinityGroupNs, null));
+        String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant,
+                        setNamespaceAntiAffinityGroupNs));
+        assertNull(namespaceAntiAffinityGroupResp);
+
+        // 2.set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        asyncRequests(response -> 
namespaces.setNamespaceAntiAffinityGroup(response, testTenant,
+                setNamespaceAntiAffinityGroupNs, 
namespaceAntiAffinityGroupReq));
+
+        // 3.assert namespace anti affinity group
+        namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant,
+                        setNamespaceAntiAffinityGroupNs));
+        assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+
+        // 4.delete namespace anti affinity group
+        asyncRequests(response -> 
namespaces.removeNamespaceAntiAffinityGroup(response, testTenant,
+                setNamespaceAntiAffinityGroupNs));
+
+        // 5.assert namespace anti affinity group
+        namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                response -> namespaces.getNamespaceAntiAffinityGroup(response, 
testTenant,
+                        setNamespaceAntiAffinityGroupNs));
+        assertNull(namespaceAntiAffinityGroupResp);
+    }
+
+    @Test
+    public void testGetClusterAntiAffinityNamespaces() throws Exception {
+        // create 5 namespaces, 3 namespaces are set to the same namespace 
anti affinity group,
+        // 2 namespaces are not set to any anti affinity group
+        String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1";
+        String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2";
+        String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3";
+        String namespaceWithoutAntiAffinity1 = 
"namespace-without-anti-affinity-1";
+        String namespaceWithoutAntiAffinity2 = 
"namespace-without-anti-affinity-2";
+
+        // create namespaces
+        List<String> allNamespaces =
+                List.of(namespaceWithAntiAffinity1, 
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3,
+                        namespaceWithoutAntiAffinity1, 
namespaceWithoutAntiAffinity2);
+        for (String namespace : allNamespaces) {
+            asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, namespace, null));
+        }
+
+        // set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        List<String> namespacesWithAntiAffinityGroup =
+                List.of(namespaceWithAntiAffinity1, 
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3);
+        for (String namespace : namespacesWithAntiAffinityGroup) {
+            asyncRequests(response -> 
namespaces.setNamespaceAntiAffinityGroup(response, testTenant, namespace,
+                    namespaceAntiAffinityGroupReq));
+        }
+
+        // assert namespace anti affinity group
+        for (String namespace : namespacesWithAntiAffinityGroup) {
+            String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+                    response -> 
namespaces.getNamespaceAntiAffinityGroup(response, testTenant, namespace));
+            assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+        }
+
+        // get namespaces in cluster of given anti affinity group
+        List<String> namespacesResp = (List<String>) asyncRequests(
+                response -> namespaces.getAntiAffinityNamespaces(response, 
testLocalCluster,
+                        namespaceAntiAffinityGroupReq, testTenant));
+        List<String> namespacesWithFullPath =
+                namespacesWithAntiAffinityGroup.stream().map(ns -> 
NamespaceName.get(testTenant, ns))
+                        .map(NamespaceName::toString).toList();
+        assertEquals(namespacesResp, namespacesWithFullPath);
+    }
+
 }


Reply via email to