This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 462c7176a68 [fix][admin] Set local policies overwrites "number of 
bundles" passed during namespace creation (#24762)
462c7176a68 is described below

commit 462c7176a68c0c14a4df75a1858891fdf30e4e5c
Author: Oneby <[email protected]>
AuthorDate: Tue Nov 4 21:46:48 2025 +0800

    [fix][admin] Set local policies overwrites "number of bundles" passed 
during namespace creation (#24762)
    
    Co-authored-by: oneby-wang <[email protected]>
    (cherry picked from commit 3983ff012dbf97a656dd99f68b63b79bdf6f1602)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  26 +++++-
 .../pulsar/broker/admin/NamespacesV2Test.java      | 104 +++++++++++++++++++++
 .../loadbalance/ExtensibleLoadManagerTest.java     |  15 ++-
 3 files changed, 140 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0e8d5febb9c..c3974cbaa34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-import static 
org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
@@ -909,7 +908,7 @@ public abstract class NamespacesBase extends AdminResource {
                         policies -> new LocalPolicies(policies.bundles,
                                 bookieAffinityGroup,
                                 policies.namespaceAntiAffinityGroup))
-                        .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
+                        .orElseGet(() -> new 
LocalPolicies(getDefaultBundleData(),
                                 bookieAffinityGroup,
                                 null));
                 log.info("[{}] Successfully updated local-policies 
configuration: namespace={}, map={}", clientAppId(),
@@ -920,6 +919,8 @@ public abstract class NamespacesBase extends AdminResource {
             log.warn("[{}] Failed to update local-policy configuration for 
namespace {}: does not exist", clientAppId(),
                     namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (RestException re) {
+            throw re;
         } catch (Exception e) {
             log.error("[{}] Failed to update local-policy configuration for 
namespace {}", clientAppId(), namespaceName,
                     e);
@@ -1735,11 +1736,13 @@ public abstract class NamespacesBase extends 
AdminResource {
                 lp.map(policies -> new LocalPolicies(policies.bundles,
                         policies.bookieAffinityGroup,
                         antiAffinityGroup))
-                        .orElseGet(() -> new LocalPolicies(defaultBundle(),
+                        .orElseGet(() -> new 
LocalPolicies(getDefaultBundleData(),
                                 null, antiAffinityGroup))
             );
             log.info("[{}] Successfully updated local-policies configuration: 
namespace={}, map={}", clientAppId(),
                     namespaceName, antiAffinityGroup);
+        } catch (RestException re) {
+            throw re;
         } catch (Exception e) {
             log.error("[{}] Failed to update local-policy configuration for 
namespace {}", clientAppId(), namespaceName,
                     e);
@@ -2785,5 +2788,22 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .thenApply(policies -> policies.allowed_clusters);
     }
 
+    // TODO remove this sync method after async refactor
+    private BundlesData getDefaultBundleData() {
+        try {
+            return 
getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
+                    TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get namespace-policy configuration for 
namespace {}", clientAppId(),
+                    namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
+    private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
+        return namespaceResources().getPoliciesAsync(namespaceName).thenApply(
+                optionalPolicies -> optionalPolicies.isPresent() ? 
optionalPolicies.get().bundles :
+                        
getBundles(config().getDefaultNumberOfNamespaceBundles()));
+    }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index cec30762194..0e4ede7ae30 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -38,8 +39,11 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -196,4 +200,104 @@ public class NamespacesV2Test extends 
MockedPulsarServiceBaseTest {
                 this.testTenant, this.testNamespace));
         assertTrue(Objects.isNull(dispatchRate));
     }
+
+    @Test
+    public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception 
{
+        // 1. create namespace with empty policies
+        String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, setBookieAffinityGroupNs, null));
+
+        // 2.set bookie affinity group
+        String primaryAffinityGroup = "primary-affinity-group";
+        String secondaryAffinityGroup = "secondary-affinity-group";
+        BookieAffinityGroupData bookieAffinityGroupDataReq =
+                
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
+                        
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
+        namespaces.setBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
+
+        // 3.query namespace num bundles, should be 
conf.getDefaultNumberOfNamespaceBundles()
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setBookieAffinityGroupNs));
+        assertEquals(bundlesData.getNumBundles(), 
conf.getDefaultNumberOfNamespaceBundles());
+
+        // 4.assert namespace bookie affinity group
+        BookieAffinityGroupData bookieAffinityGroupDataResp =
+                namespaces.getBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs);
+        assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
+    }
+
+    @Test
+    public void testSetBookieAffinityGroupWithExistBundlePolicies() throws 
Exception {
+        // 1. create namespace with specified num bundles
+        String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
+        Policies policies = new Policies();
+        policies.bundles = getBundles(10);
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, setBookieAffinityGroupNs, policies));
+
+        // 2.set bookie affinity group
+        String primaryAffinityGroup = "primary-affinity-group";
+        String secondaryAffinityGroup = "secondary-affinity-group";
+        BookieAffinityGroupData bookieAffinityGroupDataReq =
+                
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
+                        
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
+        namespaces.setBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
+
+        // 3.query namespace num bundles, should be policies.bundles, which we 
set before
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setBookieAffinityGroupNs));
+        assertEquals(bundlesData, policies.bundles);
+
+        // 4.assert namespace bookie affinity group
+        BookieAffinityGroupData bookieAffinityGroupDataResp =
+                namespaces.getBookieAffinityGroup(testTenant, 
setBookieAffinityGroupNs);
+        assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
+    }
+
+    @Test
+    public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws 
Exception {
+        // 1. create namespace with empty policies
+        String setNamespaceAntiAffinityGroupNs = 
"test-set-namespace-anti-affinity-group-ns";
+        asyncRequests(
+                response -> namespaces.createNamespace(response, testTenant, 
setNamespaceAntiAffinityGroupNs, null));
+
+        // 2.set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        namespaces.setNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs,
+                namespaceAntiAffinityGroupReq);
+
+        // 3.query namespace num bundles, should be 
conf.getDefaultNumberOfNamespaceBundles()
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setNamespaceAntiAffinityGroupNs));
+        assertEquals(bundlesData.getNumBundles(), 
conf.getDefaultNumberOfNamespaceBundles());
+
+        // 4.assert namespace anti affinity group
+        String namespaceAntiAffinityGroupResp =
+                namespaces.getNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs);
+        assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+    }
+
+    @Test
+    public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() 
throws Exception {
+        // 1. create namespace with specified num bundles
+        String setNamespaceAntiAffinityGroupNs = 
"test-set-namespace-anti-affinity-group-ns";
+        Policies policies = new Policies();
+        policies.bundles = getBundles(10);
+        asyncRequests(response -> namespaces.createNamespace(response, 
testTenant, setNamespaceAntiAffinityGroupNs,
+                policies));
+
+        // 2.set namespace anti affinity group
+        String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+        namespaces.setNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs,
+                namespaceAntiAffinityGroupReq);
+
+        // 3.query namespace num bundles, should be policies.bundles, which we 
set before
+        BundlesData bundlesData = (BundlesData) asyncRequests(
+                response -> namespaces.getBundlesData(response, testTenant, 
setNamespaceAntiAffinityGroupNs));
+        assertEquals(bundlesData, policies.bundles);
+
+        // 4.assert namespace anti affinity group
+        String namespaceAntiAffinityGroupResp =
+                namespaces.getNamespaceAntiAffinityGroup(testTenant, 
setNamespaceAntiAffinityGroupNs);
+        assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroupReq);
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 6da4c739126..13ad557c197 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -288,7 +288,7 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
     }
 
     @Test(timeOut = 40 * 1000)
-    public void testAntiaffinityPolicy() throws PulsarAdminException {
+    public void testAntiAffinityPolicy() throws PulsarAdminException {
         final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
         final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + 
"/my-ns-filter" + nsSuffix;
         final int numPartition = 20;
@@ -297,14 +297,25 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
 
         assertEquals(activeBrokers.size(), NUM_BROKERS);
 
+        Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>();
         for (int i = 0; i < activeBrokers.size(); i++) {
             String namespace = antiAffinityEnabledNameSpace + "-" + i;
-            admin.namespaces().createNamespace(namespace, 10);
+            antiAffinityEnabledNameSpacesReq.add(namespace);
+            admin.namespaces().createNamespace(namespace, 1);
             admin.namespaces().setNamespaceAntiAffinityGroup(namespace, 
namespaceAntiAffinityGroup);
             admin.clusters().createFailureDomain(clusterName, 
namespaceAntiAffinityGroup, FailureDomain.builder()
                     .brokers(Set.of(activeBrokers.get(i))).build());
+            String namespaceAntiAffinityGroupResp = 
admin.namespaces().getNamespaceAntiAffinityGroup(namespace);
+            assertEquals(namespaceAntiAffinityGroupResp, 
namespaceAntiAffinityGroup);
+            FailureDomain failureDomainResp =
+                    admin.clusters().getFailureDomain(clusterName, 
namespaceAntiAffinityGroup);
+            assertEquals(failureDomainResp.getBrokers(), 
Set.of(activeBrokers.get(i)));
         }
 
+        List<String> antiAffinityNamespacesResp =
+                admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT, 
clusterName, namespaceAntiAffinityGroup);
+        assertEquals(new HashSet<>(antiAffinityNamespacesResp), 
antiAffinityEnabledNameSpacesReq);
+
         Set<String> result = new HashSet<>();
         for (int i = 0; i < activeBrokers.size(); i++) {
             final String topic = "persistent://" + 
antiAffinityEnabledNameSpace + "-" + i +"/topic";

Reply via email to