This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e041fab73c1 [fix][admin] Refactor namespace anti affinity group sync
operations to async in rest api (#25086)
e041fab73c1 is described below
commit e041fab73c1b9a6bb93457d186fc0a937beafd53
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Dec 17 19:16:20 2025 +0800
[fix][admin] Refactor namespace anti affinity group sync operations to
async in rest api (#25086)
Co-authored-by: oneby-wang <[email protected]>
---
.../broker/resources/LocalPoliciesResources.java | 6 +-
.../apache/pulsar/broker/admin/AdminResource.java | 17 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 186 +++++++--------------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 55 ++++--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 58 +++++--
.../apache/pulsar/broker/admin/NamespacesTest.java | 80 +++++++++
.../pulsar/broker/admin/NamespacesV2Test.java | 95 ++++++++++-
7 files changed, 332 insertions(+), 165 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index b7ef19ccbe8..8e7b0ab0b1e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -37,9 +37,9 @@ public class LocalPoliciesResources extends
BaseResources<LocalPolicies> {
super(localStore, LocalPolicies.class, operationTimeoutSec);
}
- public void setLocalPolicies(NamespaceName ns, Function<LocalPolicies,
LocalPolicies> modifyFunction)
- throws MetadataStoreException {
- set(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), modifyFunction);
+ public CompletableFuture<Void> setLocalPoliciesAsync(NamespaceName ns,
+
Function<LocalPolicies, LocalPolicies> modifyFunction) {
+ return setAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()),
modifyFunction);
}
public Optional<LocalPolicies> getLocalPolicies(NamespaceName ns) throws
MetadataStoreException{
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index bfa1fdc812b..9ea5b4c33cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
@@ -498,14 +499,12 @@ public abstract class AdminResource extends
PulsarWebResource {
});
}
- protected void validateClusterExists(String cluster) {
- try {
- if (!clusterResources().getCluster(cluster).isPresent()) {
+ protected CompletableFuture<Void> validateClusterExistsAsync(String
cluster) {
+ return
clusterResources().clusterExistsAsync(cluster).thenAccept(clusterExist -> {
+ if (!clusterExist) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster "
+ cluster + " does not exist.");
}
- } catch (Exception e) {
- throw new RestException(e);
- }
+ });
}
protected Policies getNamespacePolicies(String tenant, String cluster,
String namespace) {
@@ -874,6 +873,12 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
+ protected void checkNotBlank(String str, String errorMessage) {
+ if (isBlank(str)) {
+ throw new RestException(Status.PRECONDITION_FAILED, errorMessage);
+ }
+ }
+
protected boolean isManagedLedgerNotFoundException(Throwable cause) {
return cause instanceof
ManagedLedgerException.MetadataNotFoundException
|| cause instanceof MetadataStoreException.NotFoundException;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0cb4f9a493d..bfbd6523481 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
@@ -1038,35 +1037,6 @@ public abstract class NamespacesBase extends
AdminResource {
return internalSetBookieAffinityGroupAsync(null);
}
- @Deprecated
- protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
- validateSuperUserAccess();
-
- if (namespaceName.isGlobal()) {
- // check cluster ownership for a given global namespace: redirect
if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
- } else {
- validateClusterOwnership(namespaceName.getCluster());
- validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster());
- }
- try {
- final BookieAffinityGroupData bookkeeperAffinityGroup =
getLocalPolicies().getLocalPolicies(namespaceName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "Namespace local-policies does not
exist")).bookieAffinityGroup;
- return bookkeeperAffinityGroup;
- } catch (NotFoundException e) {
- log.warn("[{}] Failed to get local-policy configuration for
namespace {}: does not exist",
- clientAppId(), namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace policies does
not exist");
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to get local-policy configuration for
namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
- }
-
protected CompletableFuture<BookieAffinityGroupData>
internalGetBookieAffinityGroupAsync() {
return validateSuperUserAccessAsync().thenCompose(__ -> {
if (namespaceName.isGlobal()) {
@@ -1077,9 +1047,8 @@ public abstract class NamespacesBase extends
AdminResource {
unused ->
validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
}
}).thenCompose(__ ->
getLocalPolicies().getLocalPoliciesAsync(namespaceName))
- .thenApply(policies -> policies.orElseThrow(
- () -> new RestException(Status.NOT_FOUND, "Namespace
local-policies does not exist"))
- .bookieAffinityGroup);
+ .thenApply(policies -> policies.orElseThrow(() -> new
RestException(Status.NOT_FOUND,
+ "Namespace local-policies does not
exist")).bookieAffinityGroup);
}
private CompletableFuture<Void> validateLeaderBrokerAsync() {
@@ -1842,104 +1811,69 @@ public abstract class NamespacesBase extends
AdminResource {
internalSetPolicies("delayed_delivery_policies",
delayedDeliveryPolicies);
}
- protected void internalSetNamespaceAntiAffinityGroup(String
antiAffinityGroup) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
- checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be
null");
- validatePoliciesReadOnlyAccess();
-
- log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(),
antiAffinityGroup, namespaceName);
-
- if (isBlank(antiAffinityGroup)) {
- throw new RestException(Status.PRECONDITION_FAILED,
"antiAffinityGroup can't be empty");
- }
-
- try {
- getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
- lp.map(policies -> new LocalPolicies(policies.bundles,
- policies.bookieAffinityGroup,
- antiAffinityGroup,
- policies.migrated))
- .orElseGet(() -> new
LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
- );
- log.info("[{}] Successfully updated local-policies configuration:
namespace={}, map={}", clientAppId(),
- namespaceName, antiAffinityGroup);
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to update local-policy configuration for
namespace {}", clientAppId(), namespaceName,
- e);
- throw new RestException(e);
- }
- }
-
- protected String internalGetNamespaceAntiAffinityGroup() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
-
- try {
- return getLocalPolicies()
- .getLocalPolicies(namespaceName)
- .orElseGet(() -> new
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles())
- , null, null)).namespaceAntiAffinityGroup;
- } catch (Exception e) {
- log.error("[{}] Failed to get the antiAffinityGroup of namespace
{}", clientAppId(), namespaceName, e);
- throw new RestException(Status.NOT_FOUND, "Couldn't find namespace
policies");
- }
- }
-
- protected void internalRemoveNamespaceAntiAffinityGroup() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
-
- log.info("[{}] Deleting anti-affinity group for {}", clientAppId(),
namespaceName);
-
- try {
- getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
- new LocalPolicies(policies.bundles,
- policies.bookieAffinityGroup,
- null,
- policies.migrated));
- log.info("[{}] Successfully removed anti-affinity group for a
namespace={}", clientAppId(), namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to remove anti-affinity group for namespace
{}", clientAppId(), namespaceName, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void>
internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) {
+ checkNotNull(antiAffinityGroup, "Anti-affinity group should not be
null");
+ checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
+ .thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync()).thenCompose(
+ __ -> getDefaultBundleDataAsync().thenCompose(
+ defaultBundleData ->
getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName,
+ oldPolicies ->
oldPolicies.map(policies -> new LocalPolicies(policies.bundles,
+
policies.bookieAffinityGroup, antiAffinityGroup,
+ policies.migrated))
+ .orElseGet(() -> new
LocalPolicies(defaultBundleData, null,
+ antiAffinityGroup)))))
+ .thenAccept(__ -> log.info(
+ "[{}] Successfully updated namespace anti-affinity
group, namespace={}, anti-affinity"
+ + " group={}", clientAppId(), namespaceName,
antiAffinityGroup));
+ }
+
+ protected CompletableFuture<String>
internalGetNamespaceAntiAffinityGroupAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
+ .thenCompose(__ ->
getLocalPolicies().getLocalPoliciesAsync(namespaceName)
+ .thenApply(policiesOpt -> policiesOpt.map(localPolicies ->
localPolicies.namespaceAntiAffinityGroup)
+ .orElse(null)));
+ }
+
+ protected CompletableFuture<Void>
internalRemoveNamespaceAntiAffinityGroupAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> {
+ log.info("[{}] Removing anti-affinity group for namespace:
{}", clientAppId(), namespaceName);
+ return
getLocalPolicies().setLocalPoliciesAsync(namespaceName,
+ (policies) -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup, null,
+ policies.migrated));
+ })
+ .thenAccept(__ -> log.info("[{}] Successfully removed
anti-affinity group for namespace: {}",
+ clientAppId(), namespaceName));
}
- protected List<String> internalGetAntiAffinityNamespaces(String cluster,
String antiAffinityGroup,
- String tenant) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
+ protected CompletableFuture<List<String>>
internalGetAntiAffinityNamespacesAsync(String cluster,
+
String antiAffinityGroup,
+
String tenant) {
checkNotNull(cluster, "Cluster should not be null");
- checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be
null");
+ checkNotNull(antiAffinityGroup, "Anti-affinity group should not be
null");
checkNotNull(tenant, "Tenant should not be null");
+ checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
- log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(),
tenant, antiAffinityGroup, cluster);
-
- if (isBlank(antiAffinityGroup)) {
- throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity
group can't be empty.");
- }
- validateClusterExists(cluster);
-
- try {
- List<String> namespaces =
tenantResources().getListOfNamespaces(tenant);
-
- return namespaces.stream().filter(ns -> {
- Optional<LocalPolicies> policies;
- try {
- policies =
getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- String storedAntiAffinityGroup = policies.orElseGet(() ->
- new
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
- null, null)).namespaceAntiAffinityGroup;
- return
antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup);
- }).collect(Collectors.toList());
-
- } catch (Exception e) {
- log.warn("Failed to list of properties/namespace from global-zk",
e);
- throw new RestException(e);
- }
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
+ .thenCompose(__ -> validateClusterExistsAsync(cluster))
+ .thenCompose(__ -> {
+ log.info("[{}]-{} Finding namespaces for {} in {}",
clientAppId(), tenant, antiAffinityGroup,
+ cluster);
+ return
tenantResources().getListOfNamespacesAsync(tenant).thenCompose(namespaces -> {
+ List<CompletableFuture<String>> nsFutures =
namespaces.stream()
+ .map(ns ->
getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns))
+ .thenApply(policiesOpt ->
policiesOpt.map(
+ localPolicies ->
localPolicies.namespaceAntiAffinityGroup).orElse(null))
+
.thenApply(antiAffinityGroup::equalsIgnoreCase)
+ .thenApply(equals -> equals ? ns :
null)).toList();
+ CompletableFuture<Void> allFuture =
FutureUtil.waitForAll(nsFutures);
+ return allFuture.thenApply(
+ unused ->
nsFutures.stream().map(CompletableFuture::join).filter(Objects::nonNull)
+ .toList());
+ });
+ });
}
private boolean checkQuotas(Policies policies, RetentionPolicies
retention) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c6b8dbf5d2d..cad6899c8a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -586,10 +586,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid antiAffinityGroup") })
- public void setNamespaceAntiAffinityGroup(@PathParam("property") String
property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace, String antiAffinityGroup) {
+ public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("property") String
property,
+ @PathParam("cluster") String
cluster,
+ @PathParam("namespace") String
namespace, String antiAffinityGroup) {
validateNamespaceName(property, cluster, namespace);
- internalSetNamespaceAntiAffinityGroup(antiAffinityGroup);
+ internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to set namespace anti-affinity
group, tenant: {}, namespace: {}, "
+ + "antiAffinityGroup: {}", clientAppId(),
property, namespace, antiAffinityGroup, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -597,10 +606,19 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get anti-affinity group of a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
- public String getNamespaceAntiAffinityGroup(@PathParam("property") String
property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace) {
+ public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("property") String
property,
+ @PathParam("cluster") String
cluster,
+ @PathParam("namespace") String
namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetNamespaceAntiAffinityGroup();
+ internalGetNamespaceAntiAffinityGroupAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get namespace anti-affinity
group, tenant: {}, namespace: {}",
+ clientAppId(), property, namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -609,10 +627,19 @@ public class Namespaces extends NamespacesBase {
+ " api can be only accessed by admin of any of the existing
property")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 412, message = "Cluster not
exist/Anti-affinity group can't be empty.")})
- public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String
cluster,
+ public void getAntiAffinityNamespaces(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("cluster") String
cluster,
@PathParam("group") String
antiAffinityGroup,
@QueryParam("property")
String property) {
- return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup,
property);
+ internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup,
property)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get all namespaces in cluster of
given anti-affinity group, cluster: {}, "
+ + "tenant: {}, antiAffinityGroup: {}",
clientAppId(), cluster, property, antiAffinityGroup,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -621,11 +648,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
- public void removeNamespaceAntiAffinityGroup(@PathParam("property") String
property,
+ public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("property") String
property,
@PathParam("cluster") String
cluster,
@PathParam("namespace")
String namespace) {
validateNamespaceName(property, cluster, namespace);
- internalRemoveNamespaceAntiAffinityGroup();
+ internalRemoveNamespaceAntiAffinityGroupAsync()
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove namespace anti-affinity
group, tenant: {}, namespace: {}",
+ clientAppId(), property, namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 62bf5cd7aea..90f4b087bfe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2138,13 +2138,21 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
@ApiResponse(code = 412, message = "Invalid antiAffinityGroup")})
- public void setNamespaceAntiAffinityGroup(@PathParam("tenant") String
tenant,
+ public void setNamespaceAntiAffinityGroup(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
@PathParam("namespace") String
namespace,
@ApiParam(value = "Anti-affinity
group for the specified namespace",
required = true)
- String
antiAffinityGroup) {
+ String antiAffinityGroup) {
validateNamespaceName(tenant, namespace);
- internalSetNamespaceAntiAffinityGroup(antiAffinityGroup);
+ internalSetNamespaceAntiAffinityGroupAsync(antiAffinityGroup)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to set namespace anti-affinity
group, tenant: {}, namespace: {}, "
+ + "antiAffinityGroup: {}", clientAppId(), tenant,
namespace, antiAffinityGroup, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -2152,10 +2160,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get anti-affinity group of a namespace.", response
= String.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public String getNamespaceAntiAffinityGroup(@PathParam("tenant") String
tenant,
- @PathParam("namespace") String namespace) {
+ public void getNamespaceAntiAffinityGroup(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetNamespaceAntiAffinityGroup();
+ internalGetNamespaceAntiAffinityGroupAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get namespace anti-affinity
group, tenant: {}, namespace: {}",
+ clientAppId(), tenant, namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -2166,10 +2182,18 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
- public void removeNamespaceAntiAffinityGroup(@PathParam("tenant") String
tenant,
- @PathParam("namespace") String namespace) {
+ public void removeNamespaceAntiAffinityGroup(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace")
String namespace) {
validateNamespaceName(tenant, namespace);
- internalRemoveNamespaceAntiAffinityGroup();
+ internalRemoveNamespaceAntiAffinityGroupAsync()
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove namespace anti-affinity
group, tenant: {}, namespace: {}",
+ clientAppId(), tenant, namespace, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -2179,9 +2203,19 @@ public class Namespaces extends NamespacesBase {
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 412, message = "Cluster not
exist/Anti-affinity group can't be empty.")})
- public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String
cluster,
- @PathParam("group") String antiAffinityGroup,
@QueryParam("tenant") String tenant) {
- return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup,
tenant);
+ public void getAntiAffinityNamespaces(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("cluster") String
cluster,
+ @PathParam("group") String
antiAffinityGroup,
+ @QueryParam("tenant") String
tenant) {
+ internalGetAntiAffinityNamespacesAsync(cluster, antiAffinityGroup,
tenant)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get all namespaces in cluster of
given anti-affinity group, cluster: {}, "
+ + "tenant: {}, antiAffinityGroup: {}",
clientAppId(), cluster, tenant, antiAffinityGroup,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 2aaa5676d5b..2f5b99bfd9d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -2433,4 +2433,84 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
setBookieAffinityGroupNs));
assertNull(bookieAffinityGroupDataResp);
}
+
+ @Test
+ public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception {
+ // 1. create namespace with empty policies, namespace anti affinity
group should be null
+ String setNamespaceAntiAffinityGroupNs =
"test-set-namespace-anti-affinity-group-ns";
+ asyncRequests(response -> namespaces.createNamespace(response,
testTenant, testLocalCluster,
+ setNamespaceAntiAffinityGroupNs, (Policies) null));
+ String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant, testLocalCluster,
+ setNamespaceAntiAffinityGroupNs));
+ assertNull(namespaceAntiAffinityGroupResp);
+
+ // 2.set namespace anti affinity group
+ String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+ asyncRequests(response ->
namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster,
+ setNamespaceAntiAffinityGroupNs,
namespaceAntiAffinityGroupReq));
+
+ // 3.assert namespace anti affinity group
+ namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant, testLocalCluster,
+ setNamespaceAntiAffinityGroupNs));
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
+
+ // 4.delete namespace anti affinity group
+ asyncRequests(response ->
namespaces.removeNamespaceAntiAffinityGroup(response, testTenant,
testLocalCluster,
+ setNamespaceAntiAffinityGroupNs));
+
+ // 5.assert namespace anti affinity group
+ namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant, testLocalCluster,
+ setNamespaceAntiAffinityGroupNs));
+ assertNull(namespaceAntiAffinityGroupResp);
+ }
+
+ @Test
+ public void testGetClusterAntiAffinityNamespaces() throws Exception {
+ // create 5 namespaces, 3 namespaces are set to the same namespace
anti affinity group,
+ // 2 namespaces are not set to any anti affinity group
+ String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1";
+ String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2";
+ String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3";
+ String namespaceWithoutAntiAffinity1 =
"namespace-without-anti-affinity-1";
+ String namespaceWithoutAntiAffinity2 =
"namespace-without-anti-affinity-2";
+
+ // create namespaces
+ List<String> allNamespaces =
+ List.of(namespaceWithAntiAffinity1,
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3,
+ namespaceWithoutAntiAffinity1,
namespaceWithoutAntiAffinity2);
+ for (String namespace : allNamespaces) {
+ asyncRequests(response -> namespaces.createNamespace(response,
testTenant, testLocalCluster, namespace,
+ (Policies) null));
+ }
+
+ // set namespace anti affinity group
+ String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+ List<String> namespacesWithAntiAffinityGroup =
+ List.of(namespaceWithAntiAffinity1,
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3);
+ for (String namespace : namespacesWithAntiAffinityGroup) {
+ asyncRequests(response ->
namespaces.setNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster,
+ namespace, namespaceAntiAffinityGroupReq));
+ }
+
+ // assert namespace anti affinity group
+ for (String namespace : namespacesWithAntiAffinityGroup) {
+ String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response ->
namespaces.getNamespaceAntiAffinityGroup(response, testTenant, testLocalCluster,
+ namespace));
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
+ }
+
+ // get namespaces in cluster of given anti affinity group
+ List<String> namespacesResp = (List<String>) asyncRequests(
+ response -> namespaces.getAntiAffinityNamespaces(response,
testLocalCluster,
+ namespaceAntiAffinityGroupReq, testTenant));
+ List<String> namespacesWithFullPath =
+ namespacesWithAntiAffinityGroup.stream().map(ns ->
NamespaceName.get(testTenant, testLocalCluster, ns))
+ .map(NamespaceName::toString).toList();
+ assertEquals(namespacesResp, namespacesWithFullPath);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index 629e92c056f..596cfa3f396 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -305,8 +305,8 @@ public class NamespacesV2Test extends
MockedPulsarServiceBaseTest {
// 2.set namespace anti affinity group
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
- namespaces.setNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs,
- namespaceAntiAffinityGroupReq);
+ asyncRequests(response ->
namespaces.setNamespaceAntiAffinityGroup(response, testTenant,
+ setNamespaceAntiAffinityGroupNs,
namespaceAntiAffinityGroupReq));
// 3.query namespace num bundles, should be
conf.getDefaultNumberOfNamespaceBundles()
BundlesData bundlesData = (BundlesData) asyncRequests(
@@ -314,8 +314,9 @@ public class NamespacesV2Test extends
MockedPulsarServiceBaseTest {
assertEquals(bundlesData.getNumBundles(),
conf.getDefaultNumberOfNamespaceBundles());
// 4.assert namespace anti affinity group
- String namespaceAntiAffinityGroupResp =
- namespaces.getNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs);
+ String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant,
+ setNamespaceAntiAffinityGroupNs));
assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
}
@@ -330,8 +331,8 @@ public class NamespacesV2Test extends
MockedPulsarServiceBaseTest {
// 2.set namespace anti affinity group
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
- namespaces.setNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs,
- namespaceAntiAffinityGroupReq);
+ asyncRequests(response ->
namespaces.setNamespaceAntiAffinityGroup(response, testTenant,
+ setNamespaceAntiAffinityGroupNs,
namespaceAntiAffinityGroupReq));
// 3.query namespace num bundles, should be policies.bundles, which we
set before
BundlesData bundlesData = (BundlesData) asyncRequests(
@@ -339,8 +340,9 @@ public class NamespacesV2Test extends
MockedPulsarServiceBaseTest {
assertEquals(bundlesData, policies.bundles);
// 4.assert namespace anti affinity group
- String namespaceAntiAffinityGroupResp =
- namespaces.getNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs);
+ String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant,
+ setNamespaceAntiAffinityGroupNs));
assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
}
@@ -418,4 +420,81 @@ public class NamespacesV2Test extends
MockedPulsarServiceBaseTest {
assertNull(bookieAffinityGroupDataResp);
}
+ @Test
+ public void testSetAndDeleteNamespaceAntiAffinityGroup() throws Exception {
+ // 1. create namespace with empty policies, namespace anti affinity
group should be null
+ String setNamespaceAntiAffinityGroupNs =
"test-set-namespace-anti-affinity-group-ns";
+ asyncRequests(
+ response -> namespaces.createNamespace(response, testTenant,
setNamespaceAntiAffinityGroupNs, null));
+ String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant,
+ setNamespaceAntiAffinityGroupNs));
+ assertNull(namespaceAntiAffinityGroupResp);
+
+ // 2.set namespace anti affinity group
+ String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+ asyncRequests(response ->
namespaces.setNamespaceAntiAffinityGroup(response, testTenant,
+ setNamespaceAntiAffinityGroupNs,
namespaceAntiAffinityGroupReq));
+
+ // 3.assert namespace anti affinity group
+ namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant,
+ setNamespaceAntiAffinityGroupNs));
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
+
+ // 4.delete namespace anti affinity group
+ asyncRequests(response ->
namespaces.removeNamespaceAntiAffinityGroup(response, testTenant,
+ setNamespaceAntiAffinityGroupNs));
+
+ // 5.assert namespace anti affinity group
+ namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response -> namespaces.getNamespaceAntiAffinityGroup(response,
testTenant,
+ setNamespaceAntiAffinityGroupNs));
+ assertNull(namespaceAntiAffinityGroupResp);
+ }
+
+ @Test
+ public void testGetClusterAntiAffinityNamespaces() throws Exception {
+ // create 5 namespaces, 3 namespaces are set to the same namespace
anti affinity group,
+ // 2 namespaces are not set to any anti affinity group
+ String namespaceWithAntiAffinity1 = "namespace-with-anti-affinity-1";
+ String namespaceWithAntiAffinity2 = "namespace-with-anti-affinity-2";
+ String namespaceWithAntiAffinity3 = "namespace-with-anti-affinity-3";
+ String namespaceWithoutAntiAffinity1 =
"namespace-without-anti-affinity-1";
+ String namespaceWithoutAntiAffinity2 =
"namespace-without-anti-affinity-2";
+
+ // create namespaces
+ List<String> allNamespaces =
+ List.of(namespaceWithAntiAffinity1,
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3,
+ namespaceWithoutAntiAffinity1,
namespaceWithoutAntiAffinity2);
+ for (String namespace : allNamespaces) {
+ asyncRequests(response -> namespaces.createNamespace(response,
testTenant, namespace, null));
+ }
+
+ // set namespace anti affinity group
+ String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+ List<String> namespacesWithAntiAffinityGroup =
+ List.of(namespaceWithAntiAffinity1,
namespaceWithAntiAffinity2, namespaceWithAntiAffinity3);
+ for (String namespace : namespacesWithAntiAffinityGroup) {
+ asyncRequests(response ->
namespaces.setNamespaceAntiAffinityGroup(response, testTenant, namespace,
+ namespaceAntiAffinityGroupReq));
+ }
+
+ // assert namespace anti affinity group
+ for (String namespace : namespacesWithAntiAffinityGroup) {
+ String namespaceAntiAffinityGroupResp = (String) asyncRequests(
+ response ->
namespaces.getNamespaceAntiAffinityGroup(response, testTenant, namespace));
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
+ }
+
+ // get namespaces in cluster of given anti affinity group
+ List<String> namespacesResp = (List<String>) asyncRequests(
+ response -> namespaces.getAntiAffinityNamespaces(response,
testLocalCluster,
+ namespaceAntiAffinityGroupReq, testTenant));
+ List<String> namespacesWithFullPath =
+ namespacesWithAntiAffinityGroup.stream().map(ns ->
NamespaceName.get(testTenant, ns))
+ .map(NamespaceName::toString).toList();
+ assertEquals(namespacesResp, namespacesWithFullPath);
+ }
+
}