This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 38807b1511b [fix][broker] Avoid split non-existent bundle (#25031)
38807b1511b is described below
commit 38807b1511ba3b8c150d69c16a0c3ae36f321dac
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Jan 14 20:41:01 2026 +0800
[fix][broker] Avoid split non-existent bundle (#25031)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 21 ++++++++-
.../pulsar/common/naming/NamespaceBundles.java | 2 +-
.../impl/ModularLoadManagerImplTest.java | 54 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 75c60e26879..a9d7ddd78e0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -368,6 +369,16 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
return future;
}
+ private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles
namespaceBundles,
+ NamespaceBundle
bundleRange) {
+ try {
+ namespaceBundles.validateBundle(bundleRange);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
// Attempt to local the data for the given bundle in metadata store
// If it cannot be found, return the default bundle data.
@Override
@@ -762,8 +773,14 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
try {
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
- if (!namespaceBundleFactory
-
.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
+ NamespaceBundle bundle =
namespaceBundleFactory.getBundle(namespaceName, bundleRange);
+ if (!namespaceBundleFactory.canSplitBundle(bundle)) {
+ continue;
+ }
+
+ NamespaceBundles bundles =
namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName));
+ if (!checkBundleDataExistInNamespaceBundles(bundles,
bundle)) {
+ log.warn("Bundle {} has been removed, skip split this
bundle ", bundleName);
continue;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index 27c73edc6b5..c298eb8aa36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -106,7 +106,7 @@ public class NamespaceBundles {
return bundles.size();
}
- public void validateBundle(NamespaceBundle nsBundle) throws Exception {
+ public void validateBundle(NamespaceBundle nsBundle) throws
IllegalArgumentException {
int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list",
nsBundle);
NamespaceBundle foundBundle = bundles.get(idx);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index ad07dbfa217..577a8c19485 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -40,6 +40,7 @@ import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -70,6 +71,7 @@ import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -1130,4 +1132,56 @@ public class ModularLoadManagerImplTest {
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}
+ @Test
+ public void testRepeatSplitBundle() throws Exception {
+ final String cluster = "use";
+ final String tenant = "my-tenant";
+ final String namespace = "repeat-split-bundle";
+ final String topicName = tenant + "/" + namespace + "/" + "topic";
+ int bundleNumbers = 8;
+
+ admin1.clusters().createCluster(cluster, ClusterData.builder()
+ .serviceUrl(pulsar1.getWebServiceAddress()).build());
+ admin1.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet(cluster)));
+ admin1.namespaces().createNamespace(tenant + "/" + namespace,
bundleNumbers);
+
+ LoadData loadData = (LoadData) getField(primaryLoadManager,
"loadData");
+ LocalBrokerData localData = (LocalBrokerData)
getField(primaryLoadManager, "localData");
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
+
+ // create a lot of topic to fully distributed among bundles.
+ List<Consumer> consumers = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String topicNameI = topicName + i;
+ admin1.topics().createPartitionedTopic(topicNameI, 20);
+ // trigger bundle assignment
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicNameI)
+ .subscriptionName("my-subscriber-name2").subscribe();
+ consumers.add(consumer);
+ }
+
+ String topicToFindBundle = topicName + 0;
+ NamespaceBundle realBundle =
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
+ String bundleKey = realBundle.toString();
+ log.info("Before bundle={}", bundleKey);
+
+ NamespaceBundleStats stats = new NamespaceBundleStats();
+ stats.msgRateIn = 100000.0;
+ localData.getLastStats().put(bundleKey, stats);
+ pulsar1.getBrokerService().updateRates();
+
+ primaryLoadManager.updateAll();
+
+ primaryLoadManager.updateAll();
+ Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
+
+ for (Consumer consumer : consumers) {
+ consumer.close();
+ }
+ }
+
}