This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cd703562955947a9474aae0b78ccc20be0b027de Author: Oneby <[email protected]> AuthorDate: Wed Dec 10 23:24:36 2025 +0800 [fix][admin] Refactor bookie affinity group sync operations to async in rest api (#25050) Co-authored-by: oneby-wang <[email protected]> (cherry picked from commit 4e5364fa7e5ffb6b834231737165253e6f8377e7) --- .../broker/resources/LocalPoliciesResources.java | 5 ++ .../pulsar/broker/admin/impl/NamespacesBase.java | 77 +++++++++++----------- .../apache/pulsar/broker/admin/v1/Namespaces.java | 41 +++++++++--- .../apache/pulsar/broker/admin/v2/Namespaces.java | 41 +++++++++--- .../apache/pulsar/broker/admin/NamespacesTest.java | 34 ++++++++++ .../pulsar/broker/admin/NamespacesV2Test.java | 45 +++++++++++-- 6 files changed, 180 insertions(+), 63 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index ae3479fde59..b7ef19ccbe8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -55,6 +55,11 @@ public class LocalPoliciesResources extends BaseResources<LocalPolicies> { setWithCreate(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), createFunction); } + public CompletableFuture<Void> setLocalPoliciesWithCreateAsync(NamespaceName ns, Function<Optional<LocalPolicies>, + LocalPolicies> createFunction) { + return setWithCreateAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), createFunction); + } + public CompletableFuture<Void> createLocalPoliciesAsync(NamespaceName ns, LocalPolicies policies) { return getCache().create(joinPath(LOCAL_POLICIES_ROOT, ns.toString()), policies); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 39718c3902e..0cb4f9a493d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1013,49 +1013,32 @@ public abstract class NamespacesBase extends AdminResource { }); } - - protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) { - validateSuperUserAccess(); - log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup, - this.namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - - try { - getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> { - LocalPolicies localPolicies = oldPolicies.map( - policies -> new LocalPolicies(policies.bundles, - bookieAffinityGroup, - policies.namespaceAntiAffinityGroup, - policies.migrated)) - .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), bookieAffinityGroup, null)); - log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), - namespaceName, localPolicies); - return localPolicies; - }); - } catch (NotFoundException e) { - log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(), - namespaceName); - throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, - e); - throw new RestException(e); - } + protected CompletableFuture<Void> internalSetBookieAffinityGroupAsync(BookieAffinityGroupData bookieAffinityGroup) { + return validateSuperUserAccessAsync().thenCompose(__ -> { + log.info("[{}] Setting bookie affinity group {} for namespace {}", clientAppId(), bookieAffinityGroup, + this.namespaceName); + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return validateClusterOwnershipAsync(namespaceName.getCluster()).thenCompose( + unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster())); + } + }).thenCompose(__ -> getDefaultBundleDataAsync().thenCompose( + defaultBundleData -> getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName, oldPolicies -> + oldPolicies.map(policies -> new LocalPolicies(policies.bundles, bookieAffinityGroup, + policies.namespaceAntiAffinityGroup, policies.migrated)) + .orElseGet(() -> new LocalPolicies(defaultBundleData, bookieAffinityGroup, null))))) + .thenAccept(__ -> log.info( + "[{}] Successfully updated bookie affinity group: namespace={}, bookieAffinityGroup={}", clientAppId(), + namespaceName, bookieAffinityGroup)); } - protected void internalDeleteBookieAffinityGroup() { - internalSetBookieAffinityGroup(null); + protected CompletableFuture<Void> internalDeleteBookieAffinityGroupAsync() { + return internalSetBookieAffinityGroupAsync(null); } + @Deprecated protected BookieAffinityGroupData internalGetBookieAffinityGroup() { validateSuperUserAccess(); @@ -1084,6 +1067,21 @@ public abstract class NamespacesBase extends AdminResource { } } + protected CompletableFuture<BookieAffinityGroupData> internalGetBookieAffinityGroupAsync() { + return validateSuperUserAccessAsync().thenCompose(__ -> { + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return validateClusterOwnershipAsync(namespaceName.getCluster()).thenCompose( + unused -> validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster())); + } + }).thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName)) + .thenApply(policies -> policies.orElseThrow( + () -> new RestException(Status.NOT_FOUND, "Namespace local-policies does not exist")) + .bookieAffinityGroup); + } + private CompletableFuture<Void> validateLeaderBrokerAsync() { if (this.isLeaderBroker()) { return CompletableFuture.completedFuture(null); @@ -2988,6 +2986,7 @@ public abstract class NamespacesBase extends AdminResource { } // TODO remove this sync method after async refactor + @Deprecated private BundlesData getDefaultBundleData() { try { return getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 7f19b5eb255..af714ac1aee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1249,10 +1249,18 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public void setBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, BookieAffinityGroupData bookieAffinityGroup) { + public void setBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + BookieAffinityGroupData bookieAffinityGroup) { validateNamespaceName(property, cluster, namespace); - internalSetBookieAffinityGroup(bookieAffinityGroup); + internalSetBookieAffinityGroupAsync(bookieAffinityGroup) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set bookie affinity group for namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -1263,10 +1271,17 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + public void getBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetBookieAffinityGroup(); + internalGetBookieAffinityGroupAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get bookie affinity group for namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -1277,10 +1292,18 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public void deleteBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void deleteBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - internalDeleteBookieAffinityGroup(); + internalDeleteBookieAffinityGroupAsync() + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to delete bookie affinity group for namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index a8ed06c7fcf..a8eea7cfb5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1421,11 +1421,19 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification")}) - public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + public void setBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, @ApiParam(value = "Bookie affinity group for the specified namespace") - BookieAffinityGroupData bookieAffinityGroup) { + BookieAffinityGroupData bookieAffinityGroup) { validateNamespaceName(tenant, namespace); - internalSetBookieAffinityGroup(bookieAffinityGroup); + internalSetBookieAffinityGroupAsync(bookieAffinityGroup) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set bookie affinity group for namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -1437,10 +1445,17 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") String property, - @PathParam("namespace") String namespace) { + public void getBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("namespace") String namespace) { validateNamespaceName(property, namespace); - return internalGetBookieAffinityGroup(); + internalGetBookieAffinityGroupAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get bookie affinity group for namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -1451,10 +1466,18 @@ public class Namespaces extends NamespacesBase { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification") }) - public void deleteBookieAffinityGroup(@PathParam("property") String property, - @PathParam("namespace") String namespace) { + public void deleteBookieAffinityGroup(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("namespace") String namespace) { validateNamespaceName(property, namespace); - internalDeleteBookieAffinityGroup(); + internalDeleteBookieAffinityGroupAsync() + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to delete bookie affinity group for namespace {}", clientAppId(), + namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 75f0854970e..2aaa5676d5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2399,4 +2399,38 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { "Expected TLS service error, got: " + message); } } + + @Test + public void testSetAndDeleteBookieAffinityGroup() throws Exception { + // 1. create namespace with empty policies + String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns"; + asyncRequests( + response -> namespaces.createNamespace(response, testTenant, testLocalCluster, setBookieAffinityGroupNs, + (Policies) null)); + + // 2.set bookie affinity group + String primaryAffinityGroup = "primary-affinity-group"; + String secondaryAffinityGroup = "secondary-affinity-group"; + BookieAffinityGroupData bookieAffinityGroupDataReq = + BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup) + .bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build(); + asyncRequests(response -> namespaces.setBookieAffinityGroup(response, testTenant, testLocalCluster, + setBookieAffinityGroupNs, bookieAffinityGroupDataReq)); + + // 3.assert namespace bookie affinity group + BookieAffinityGroupData bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests( + response -> namespaces.getBookieAffinityGroup(response, testTenant, testLocalCluster, + setBookieAffinityGroupNs)); + assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq); + + // 4.delete bookie affinity group + asyncRequests(response -> namespaces.deleteBookieAffinityGroup(response, testTenant, testLocalCluster, + setBookieAffinityGroupNs)); + + // 5.assert namespace bookie affinity group + bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests( + response -> namespaces.getBookieAffinityGroup(response, testTenant, testLocalCluster, + setBookieAffinityGroupNs)); + assertNull(bookieAffinityGroupDataResp); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java index 8ac8155614a..629e92c056f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; @@ -253,7 +254,8 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { BookieAffinityGroupData bookieAffinityGroupDataReq = BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup) .bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build(); - namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq); + asyncRequests(response -> namespaces.setBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs, + bookieAffinityGroupDataReq)); // 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles() BundlesData bundlesData = (BundlesData) asyncRequests( @@ -261,8 +263,8 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); // 4.assert namespace bookie affinity group - BookieAffinityGroupData bookieAffinityGroupDataResp = - namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs); + BookieAffinityGroupData bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests( + response -> namespaces.getBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs)); assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq); } @@ -280,7 +282,8 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { BookieAffinityGroupData bookieAffinityGroupDataReq = BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup) .bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build(); - namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq); + asyncRequests(response -> namespaces.setBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs, + bookieAffinityGroupDataReq)); // 3.query namespace num bundles, should be policies.bundles, which we set before BundlesData bundlesData = (BundlesData) asyncRequests( @@ -288,8 +291,8 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { assertEquals(bundlesData, policies.bundles); // 4.assert namespace bookie affinity group - BookieAffinityGroupData bookieAffinityGroupDataResp = - namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs); + BookieAffinityGroupData bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests( + response -> namespaces.getBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs)); assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq); } @@ -385,4 +388,34 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); assertEquals(policiesResp.migrated, enableMigrationReq); } + + @Test + public void testSetAndDeleteBookieAffinityGroup() throws Exception { + // 1. create namespace with empty policies + String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns"; + asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, null)); + + // 2.set bookie affinity group + String primaryAffinityGroup = "primary-affinity-group"; + String secondaryAffinityGroup = "secondary-affinity-group"; + BookieAffinityGroupData bookieAffinityGroupDataReq = + BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup) + .bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build(); + asyncRequests(response -> namespaces.setBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs, + bookieAffinityGroupDataReq)); + + // 3.assert namespace bookie affinity group + BookieAffinityGroupData bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests( + response -> namespaces.getBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs)); + assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq); + + // 4.delete bookie affinity group + asyncRequests(response -> namespaces.deleteBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs)); + + // 5.assert namespace bookie affinity group + bookieAffinityGroupDataResp = (BookieAffinityGroupData) asyncRequests( + response -> namespaces.getBookieAffinityGroup(response, testTenant, setBookieAffinityGroupNs)); + assertNull(bookieAffinityGroupDataResp); + } + }
