This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 614879d52357d625db342c880478665e740ede55
Author: Oneby <[email protected]>
AuthorDate: Tue Nov 4 21:46:48 2025 +0800

    [fix][admin] Set local policies overwrites "number of bundles" passed 
during namespace creation (#24762)
    
    Co-authored-by: oneby-wang <[email protected]>
    (cherry picked from commit 3983ff012dbf97a656dd99f68b63b79bdf6f1602)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  34 +++--
 .../pulsar/broker/admin/NamespacesV2Test.java      | 149 +++++++++++++++++++++
 .../loadbalance/ExtensibleLoadManagerTest.java     |  15 ++-
 3 files changed, 188 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 89f55789e3e..ffbc1b9510b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-import static 
org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
@@ -962,9 +961,7 @@ public abstract class NamespacesBase extends AdminResource {
                                 bookieAffinityGroup,
                                 policies.namespaceAntiAffinityGroup,
                                 policies.migrated))
-                        .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
-                                bookieAffinityGroup,
-                                null));
+                        .orElseGet(() -> new 
LocalPolicies(getDefaultBundleData(), bookieAffinityGroup, null));
                 log.info("[{}] Successfully updated local-policies 
configuration: namespace={}, map={}", clientAppId(),
                         namespaceName, localPolicies);
                 return localPolicies;
@@ -973,6 +970,8 @@ public abstract class NamespacesBase extends AdminResource {
             log.warn("[{}] Failed to update local-policy configuration for 
namespace {}: does not exist", clientAppId(),
                     namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (RestException re) {
+            throw re;
         } catch (Exception e) {
             log.error("[{}] Failed to update local-policy configuration for 
namespace {}", clientAppId(), namespaceName,
                     e);
@@ -1789,11 +1788,12 @@ public abstract class NamespacesBase extends 
AdminResource {
                         policies.bookieAffinityGroup,
                         antiAffinityGroup,
                         policies.migrated))
-                        .orElseGet(() -> new LocalPolicies(defaultBundle(),
-                                null, antiAffinityGroup))
+                        .orElseGet(() -> new 
LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
             );
             log.info("[{}] Successfully updated local-policies configuration: 
namespace={}, map={}", clientAppId(),
                     namespaceName, antiAffinityGroup);
+        } catch (RestException re) {
+            throw re;
         } catch (Exception e) {
             log.error("[{}] Failed to update local-policy configuration for 
namespace {}", clientAppId(), namespaceName,
                     e);
@@ -2767,9 +2767,10 @@ public abstract class NamespacesBase extends 
AdminResource {
                             policies.bookieAffinityGroup,
                             policies.namespaceAntiAffinityGroup,
                             migrated))
-                    .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
-                            null, null, migrated)));
+                    .orElseGet(() -> new LocalPolicies(getDefaultBundleData(), 
null, null, migrated)));
             log.info("Successfully updated migration on namespace {}", 
namespaceName);
+        } catch (RestException re) {
+            throw re;
         } catch (Exception e) {
             log.error("Failed to update migration on namespace {}", 
namespaceName, e);
             throw new RestException(e);
@@ -2869,5 +2870,22 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .thenApply(policies -> policies.allowed_clusters);
     }
 
+    // TODO remove this sync method after async refactor
+    private BundlesData getDefaultBundleData() {
+        try {
+            return 
getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
+                    TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get namespace-policy configuration for 
namespace {}", clientAppId(),
+                    namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
+    private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
+        return namespaceResources().getPoliciesAsync(namespaceName).thenApply(
+                optionalPolicies -> optionalPolicies.isPresent() ? 
optionalPolicies.get().bundles :
+                        
getBundles(config().getDefaultNumberOfNamespaceBundles()));
+    }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index c1e8dfa3099..8ac8155614a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -38,9 +39,12 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -236,4 +240,149 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), 
Response.Status.PRECONDITION_FAILED.getStatusCode());
         }
     }
+
+    @Test
+    public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception 
{
+        // 1. create namespace with empty policies
+        String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, setBookieAffinityGroupNs, null));
+
+        // 2.set bookie affinity group
+        String primaryAffinityGroup = "primary-affinity-group";
+        String secondaryAffinityGroup = "secondary-affinity-group";
+        BookieAffinityGroupData bookieAffinityGroupDataReq =
+                
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
+                        
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
+        namespaces.setBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
+
+        // 3.query namespace num bundles, should be 
conf.getDefaultNumberOfNamespaceBundles()
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setBookieAffinityGroupNs));
+        assertEquals(bundlesData.getNumBundles(), 
conf.getDefaultNumberOfNamespaceBundles());
+
+        // 4.assert namespace bookie affinity group
+        BookieAffinityGroupData bookieAffinityGroupDataResp =
+                namespaces.getBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs);
+        assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
+    }
+
+    @Test
+    public void testSetBookieAffinityGroupWithExistBundlePolicies() throws 
Exception {
+        // 1. create namespace with specified num bundles
+        String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
+        Policies policies = new Policies();
+        policies.bundles = getBundles(10);
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, setBookieAffinityGroupNs, policies));
+
+        // 2.set bookie affinity group
+        String primaryAffinityGroup = "primary-affinity-group";
+        String secondaryAffinityGroup = "secondary-affinity-group";
+        BookieAffinityGroupData bookieAffinityGroupDataReq =
+                
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
+                        
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
+        namespaces.setBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
+
+        // 3.query namespace num bundles, should be policies.bundles, which we 
set before
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setBookieAffinityGroupNs));
+        assertEquals(bundlesData, policies.bundles);
+
+        // 4.assert namespace bookie affinity group
+        BookieAffinityGroupData bookieAffinityGroupDataResp =
+                namespaces.getBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs);
+        assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
+    }
+
+    @Test
+    public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws 
Exception {
+        // 1. create namespace with empty policies
+        String setNamespaceAntiAffinityGroupNs = 
"test-set-namespace-anti-affinity-group-ns";
+        asyncRequests(
+                response -> namespaces.createNamespace(response, testTenant, 
setNamespaceAntiAffinityGroupNs, null));
+
+        // 2.set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        namespaces.setNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs,
+                namespaceAntiAffinityGroupReq);
+
+        // 3.query namespace num bundles, should be 
conf.getDefaultNumberOfNamespaceBundles()
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setNamespaceAntiAffinityGroupNs));
+        assertEquals(bundlesData.getNumBundles(), 
conf.getDefaultNumberOfNamespaceBundles());
+
+        // 4.assert namespace anti affinity group
+        String namespaceAntiAffinityGroupResp =
+                namespaces.getNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs);
+        assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+    }
+
+    @Test
+    public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() 
throws Exception {
+        // 1. create namespace with specified num bundles
+        String setNamespaceAntiAffinityGroupNs = 
"test-set-namespace-anti-affinity-group-ns";
+        Policies policies = new Policies();
+        policies.bundles = getBundles(10);
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, setNamespaceAntiAffinityGroupNs,
+                policies));
+
+        // 2.set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        namespaces.setNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs,
+                namespaceAntiAffinityGroupReq);
+
+        // 3.query namespace num bundles, should be policies.bundles, which we 
set before
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setNamespaceAntiAffinityGroupNs));
+        assertEquals(bundlesData, policies.bundles);
+
+        // 4.assert namespace anti affinity group
+        String namespaceAntiAffinityGroupResp =
+                namespaces.getNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs);
+        assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+    }
+
+    @Test
+    public void testEnableMigrationWithEmptyPolicies() throws Exception {
+        // 1. create namespace with empty policies
+        String enableMigrationGroupNs = 
"test-set-namespace-enable-migration-ns";
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, enableMigrationGroupNs, null));
+
+        // 2.set enable migration
+        boolean enableMigrationReq = true;
+        namespaces.enableMigration(testTenant, enableMigrationGroupNs, 
enableMigrationReq);
+
+        // 3.query namespace num bundles, should be 
conf.getDefaultNumberOfNamespaceBundles()
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
enableMigrationGroupNs));
+        assertEquals(bundlesData.getNumBundles(), 
conf.getDefaultNumberOfNamespaceBundles());
+
+        // 4.assert namespace enable migration
+        Policies policiesResp = (Policies) asyncRequests(
+                response -> namespaces.getPolicies(response, testTenant, 
enableMigrationGroupNs));
+        assertEquals(policiesResp.migrated, enableMigrationReq);
+    }
+
+    @Test
+    public void testEnableMigrationWithExistBundlePolicies() throws Exception {
+        // 1. create namespace with specified num bundles
+        String enableMigrationGroupNs = 
"test-set-namespace-enable-migration-ns";
+        Policies policiesReq = new Policies();
+        policiesReq.bundles = getBundles(10);
+        asyncRequests(
+                response -> namespaces.createNamespace(response, testTenant, 
enableMigrationGroupNs, policiesReq));
+
+        // 2.set enable migration
+        boolean enableMigrationReq = true;
+        namespaces.enableMigration(testTenant, enableMigrationGroupNs, 
enableMigrationReq);
+
+        // 3.query namespace num bundles, should be policies.bundles, which we 
set before
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
enableMigrationGroupNs));
+        assertEquals(bundlesData, policiesReq.bundles);
+
+        // 4.assert namespace enable migration
+        Policies policiesResp = (Policies) asyncRequests(
+                response -> namespaces.getPolicies(response, testTenant, 
enableMigrationGroupNs));
+        assertEquals(policiesResp.migrated, enableMigrationReq);
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 5f596cc0488..6861664a1dc 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -286,7 +286,7 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
     }
 
     @Test(timeOut = 80 * 1000)
-    public void testAntiaffinityPolicy() throws PulsarAdminException {
+    public void testAntiAffinityPolicy() throws PulsarAdminException {
         final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
         final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + 
"/my-ns-filter" + nsSuffix;
         final int numPartition = 20;
@@ -295,14 +295,25 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
 
         assertEquals(activeBrokers.size(), NUM_BROKERS);
 
+        Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>();
         for (int i = 0; i < activeBrokers.size(); i++) {
             String namespace = antiAffinityEnabledNameSpace + "-" + i;
-            admin.namespaces().createNamespace(namespace, 10);
+            antiAffinityEnabledNameSpacesReq.add(namespace);
+            admin.namespaces().createNamespace(namespace, 1);
             admin.namespaces().setNamespaceAntiAffinityGroup(namespace, 
namespaceAntiAffinityGroup);
             admin.clusters().createFailureDomain(clusterName, 
namespaceAntiAffinityGroup, FailureDomain.builder()
                     .brokers(Set.of(activeBrokers.get(i))).build());
+            String namespaceAntiAffinityGroupResp = 
admin.namespaces().getNamespaceAntiAffinityGroup(namespace);
+            assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroup);
+            FailureDomain failureDomainResp =
+                    admin.clusters().getFailureDomain(clusterName, 
namespaceAntiAffinityGroup);
+            assertEquals(failureDomainResp.getBrokers(), 
Set.of(activeBrokers.get(i)));
         }
 
+        List<String> antiAffinityNamespacesResp =
+                admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT, 
clusterName, namespaceAntiAffinityGroup);
+        assertEquals(new HashSet<>(antiAffinityNamespacesResp), 
antiAffinityEnabledNameSpacesReq);
+
         Set<String> result = new HashSet<>();
         for (int i = 0; i < activeBrokers.size(); i++) {
             final String topic = "persistent://" + 
antiAffinityEnabledNameSpace + "-" + i +"/topic";

Reply via email to