This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 614879d52357d625db342c880478665e740ede55 Author: Oneby <[email protected]> AuthorDate: Tue Nov 4 21:46:48 2025 +0800 [fix][admin] Set local policies overwrites "number of bundles" passed during namespace creation (#24762) Co-authored-by: oneby-wang <[email protected]> (cherry picked from commit 3983ff012dbf97a656dd99f68b63b79bdf6f1602) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 34 +++-- .../pulsar/broker/admin/NamespacesV2Test.java | 149 +++++++++++++++++++++ .../loadbalance/ExtensibleLoadManagerTest.java | 15 ++- 3 files changed, 188 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 89f55789e3e..ffbc1b9510b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.admin.impl; import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle; import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles; import com.google.common.collect.Sets; import java.lang.reflect.Field; @@ -962,9 +961,7 @@ public abstract class NamespacesBase extends AdminResource { bookieAffinityGroup, policies.namespaceAntiAffinityGroup, policies.migrated)) - .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), - bookieAffinityGroup, - null)); + .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), bookieAffinityGroup, null)); log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), namespaceName, localPolicies); return localPolicies; @@ -973,6 +970,8 @@ public abstract class NamespacesBase extends AdminResource { log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(), namespaceName); throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (RestException re) { + throw re; } catch (Exception e) { log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, e); @@ -1789,11 +1788,12 @@ public abstract class NamespacesBase extends AdminResource { policies.bookieAffinityGroup, antiAffinityGroup, policies.migrated)) - .orElseGet(() -> new LocalPolicies(defaultBundle(), - null, antiAffinityGroup)) + .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup)) ); log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), namespaceName, antiAffinityGroup); + } catch (RestException re) { + throw re; } catch (Exception e) { log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName, e); @@ -2767,9 +2767,10 @@ public abstract class NamespacesBase extends AdminResource { policies.bookieAffinityGroup, policies.namespaceAntiAffinityGroup, migrated)) - .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), - null, null, migrated))); + .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, null, migrated))); log.info("Successfully updated migration on namespace {}", namespaceName); + } catch (RestException re) { + throw re; } catch (Exception e) { log.error("Failed to update migration on namespace {}", namespaceName, e); throw new RestException(e); @@ -2869,5 +2870,22 @@ public abstract class NamespacesBase extends AdminResource { .thenApply(policies -> policies.allowed_clusters); } + // TODO remove this sync method after async refactor + private BundlesData getDefaultBundleData() { + try { + return getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(), + TimeUnit.SECONDS); + } catch (Exception e) { + log.error("[{}] Failed to get namespace-policy configuration for namespace {}", clientAppId(), + namespaceName, e); + throw new RestException(e); + } + } + + private CompletableFuture<BundlesData> getDefaultBundleDataAsync() { + return namespaceResources().getPoliciesAsync(namespaceName).thenApply( + optionalPolicies -> optionalPolicies.isPresent() ? optionalPolicies.get().bundles : + getBundles(config().getDefaultNumberOfNamespaceBundles())); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java index c1e8dfa3099..8ac8155614a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -38,9 +39,12 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; @@ -236,4 +240,149 @@ public class NamespacesV2Test extends MockedPulsarServiceBaseTest { assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode()); } } + + @Test + public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception { + // 1. create namespace with empty policies + String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns"; + asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, null)); + + // 2.set bookie affinity group + String primaryAffinityGroup = "primary-affinity-group"; + String secondaryAffinityGroup = "secondary-affinity-group"; + BookieAffinityGroupData bookieAffinityGroupDataReq = + BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup) + .bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build(); + namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq); + + // 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles() + BundlesData bundlesData = (BundlesData) asyncRequests( + response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs)); + assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); + + // 4.assert namespace bookie affinity group + BookieAffinityGroupData bookieAffinityGroupDataResp = + namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs); + assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq); + } + + @Test + public void testSetBookieAffinityGroupWithExistBundlePolicies() throws Exception { + // 1. create namespace with specified num bundles + String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns"; + Policies policies = new Policies(); + policies.bundles = getBundles(10); + asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, policies)); + + // 2.set bookie affinity group + String primaryAffinityGroup = "primary-affinity-group"; + String secondaryAffinityGroup = "secondary-affinity-group"; + BookieAffinityGroupData bookieAffinityGroupDataReq = + BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup) + .bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build(); + namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq); + + // 3.query namespace num bundles, should be policies.bundles, which we set before + BundlesData bundlesData = (BundlesData) asyncRequests( + response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs)); + assertEquals(bundlesData, policies.bundles); + + // 4.assert namespace bookie affinity group + BookieAffinityGroupData bookieAffinityGroupDataResp = + namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs); + assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq); + } + + @Test + public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws Exception { + // 1. create namespace with empty policies + String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns"; + asyncRequests( + response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, null)); + + // 2.set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs, + namespaceAntiAffinityGroupReq); + + // 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles() + BundlesData bundlesData = (BundlesData) asyncRequests( + response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs)); + assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); + + // 4.assert namespace anti affinity group + String namespaceAntiAffinityGroupResp = + namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + } + + @Test + public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() throws Exception { + // 1. create namespace with specified num bundles + String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns"; + Policies policies = new Policies(); + policies.bundles = getBundles(10); + asyncRequests(response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, + policies)); + + // 2.set namespace anti affinity group + String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group"; + namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs, + namespaceAntiAffinityGroupReq); + + // 3.query namespace num bundles, should be policies.bundles, which we set before + BundlesData bundlesData = (BundlesData) asyncRequests( + response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs)); + assertEquals(bundlesData, policies.bundles); + + // 4.assert namespace anti affinity group + String namespaceAntiAffinityGroupResp = + namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq); + } + + @Test + public void testEnableMigrationWithEmptyPolicies() throws Exception { + // 1. create namespace with empty policies + String enableMigrationGroupNs = "test-set-namespace-enable-migration-ns"; + asyncRequests(response -> namespaces.createNamespace(response, testTenant, enableMigrationGroupNs, null)); + + // 2.set enable migration + boolean enableMigrationReq = true; + namespaces.enableMigration(testTenant, enableMigrationGroupNs, enableMigrationReq); + + // 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles() + BundlesData bundlesData = (BundlesData) asyncRequests( + response -> namespaces.getBundlesData(response, testTenant, enableMigrationGroupNs)); + assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); + + // 4.assert namespace enable migration + Policies policiesResp = (Policies) asyncRequests( + response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); + assertEquals(policiesResp.migrated, enableMigrationReq); + } + + @Test + public void testEnableMigrationWithExistBundlePolicies() throws Exception { + // 1. create namespace with specified num bundles + String enableMigrationGroupNs = "test-set-namespace-enable-migration-ns"; + Policies policiesReq = new Policies(); + policiesReq.bundles = getBundles(10); + asyncRequests( + response -> namespaces.createNamespace(response, testTenant, enableMigrationGroupNs, policiesReq)); + + // 2.set enable migration + boolean enableMigrationReq = true; + namespaces.enableMigration(testTenant, enableMigrationGroupNs, enableMigrationReq); + + // 3.query namespace num bundles, should be policies.bundles, which we set before + BundlesData bundlesData = (BundlesData) asyncRequests( + response -> namespaces.getBundlesData(response, testTenant, enableMigrationGroupNs)); + assertEquals(bundlesData, policiesReq.bundles); + + // 4.assert namespace enable migration + Policies policiesResp = (Policies) asyncRequests( + response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); + assertEquals(policiesResp.migrated, enableMigrationReq); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 5f596cc0488..6861664a1dc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -286,7 +286,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport { } @Test(timeOut = 80 * 1000) - public void testAntiaffinityPolicy() throws PulsarAdminException { + public void testAntiAffinityPolicy() throws PulsarAdminException { final String namespaceAntiAffinityGroup = "my-anti-affinity-filter"; final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + "/my-ns-filter" + nsSuffix; final int numPartition = 20; @@ -295,14 +295,25 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport { assertEquals(activeBrokers.size(), NUM_BROKERS); + Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>(); for (int i = 0; i < activeBrokers.size(); i++) { String namespace = antiAffinityEnabledNameSpace + "-" + i; - admin.namespaces().createNamespace(namespace, 10); + antiAffinityEnabledNameSpacesReq.add(namespace); + admin.namespaces().createNamespace(namespace, 1); admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup); admin.clusters().createFailureDomain(clusterName, namespaceAntiAffinityGroup, FailureDomain.builder() .brokers(Set.of(activeBrokers.get(i))).build()); + String namespaceAntiAffinityGroupResp = admin.namespaces().getNamespaceAntiAffinityGroup(namespace); + assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroup); + FailureDomain failureDomainResp = + admin.clusters().getFailureDomain(clusterName, namespaceAntiAffinityGroup); + assertEquals(failureDomainResp.getBrokers(), Set.of(activeBrokers.get(i))); } + List<String> antiAffinityNamespacesResp = + admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT, clusterName, namespaceAntiAffinityGroup); + assertEquals(new HashSet<>(antiAffinityNamespacesResp), antiAffinityEnabledNameSpacesReq); + Set<String> result = new HashSet<>(); for (int i = 0; i < activeBrokers.size(); i++) { final String topic = "persistent://" + antiAffinityEnabledNameSpace + "-" + i +"/topic";
