This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d02653abd5541a44887c806abf586715f56cb33d Author: Shoothzj <[email protected]> AuthorDate: Wed Sep 8 23:28:13 2021 +0800 Avoid to infinitely split bundle (#11937) (cherry picked from commit bef3757029eb3e48a490a02871689ab3e6abdfa5) --- .../loadbalance/impl/BundleSplitterTask.java | 8 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../loadbalance/impl/BundleSplitterTaskTest.java | 105 +++++++++++++++++++++ .../pulsar/client/api/BrokerServiceLookupTest.java | 15 ++- 4 files changed, 122 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java index bb1f990..e81fb50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java @@ -42,10 +42,8 @@ public class BundleSplitterTask implements BundleSplitStrategy { /** * Construct a BundleSplitterTask. * - * @param pulsar - * Service to construct from. */ - public BundleSplitterTask(final PulsarService pulsar) { + public BundleSplitterTask() { bundleCache = new HashSet<>(); } @@ -74,6 +72,10 @@ public class BundleSplitterTask implements BundleSplitStrategy { for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) { final String bundle = entry.getKey(); final NamespaceBundleStats stats = entry.getValue(); + if (stats.topics == 1) { + log.info("namespace bundle {} only have 1 topic", bundle); + continue; + } double totalMessageRate = 0; double totalMessageThroughput = 0; // Attempt to consider long-term message data, otherwise effectively ignore. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 5b7867e..5035b00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -248,7 +248,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar); } - bundleSplitStrategy = new BundleSplitterTask(pulsar); + bundleSplitStrategy = new BundleSplitterTask(); conf = pulsar.getConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java new file mode 100644 index 0000000..7480989 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerData; +import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.TimeAverageMessageData; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * @author hezhangjian + */ +@Slf4j +@Test(groups = "broker") +public class BundleSplitterTaskTest { + + private LocalBookkeeperEnsemble bkEnsemble; + + private PulsarService pulsar; + + @BeforeMethod + void setup() throws Exception { + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + // Start broker + ServiceConfiguration config = new ServiceConfiguration(); + config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config.setClusterName("use"); + config.setWebServicePort(Optional.of(0)); + config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); + + config.setAdvertisedAddress("localhost"); + config.setBrokerShutdownTimeoutMs(0L); + config.setBrokerServicePort(Optional.of(0)); + config.setBrokerServicePortTls(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + pulsar = new PulsarService(config); + pulsar.start(); + } + + @Test + public void testSplitTaskWhenTopicJustOne() { + final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask(); + LoadData loadData = new LoadData(); + + LocalBrokerData brokerData = new LocalBrokerData(); + Map<String, NamespaceBundleStats> lastStats = new HashMap<>(); + final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats(); + namespaceBundleStats.topics = 1; + lastStats.put("ten/ns/0x00000000_0x80000000", namespaceBundleStats); + brokerData.setLastStats(lastStats); + loadData.getBrokerData().put("broker", new BrokerData(brokerData)); + + BundleData bundleData = new BundleData(); + TimeAverageMessageData averageMessageData = new TimeAverageMessageData(); + averageMessageData.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate()); + averageMessageData.setMsgRateOut(1); + bundleData.setLongTermData(averageMessageData); + loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", bundleData); + + final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar); + Assert.assertEquals(bundlesToSplit.size(), 0); + } + + + @AfterMethod(alwaysRun = true) + void shutdown() throws Exception { + log.info("--- Shutting down ---"); + pulsar.close(); + bkEnsemble.stop(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index cb349bf..36ff448 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -1019,6 +1019,13 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { Consumer<byte[]> consumer1 = pulsarClient2.newConsumer().topic(topic1) .subscriptionName("my-subscriber-name").subscribe(); + // there should be more than one topic to trigger split + final String topic2 = "persistent://" + namespace + "/topic2"; + @Cleanup + Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2) + .subscriptionName("my-subscriber-name") + .subscribe(); + // (4) Broker-1 will own topic-1 final String unsplitBundle = namespace + "/0x00000000_0xffffffff"; @@ -1054,7 +1061,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { updateAllMethod.invoke(loadManager); conf2.setLoadBalancerAutoBundleSplitEnabled(true); conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true); - conf2.setLoadBalancerNamespaceBundleMaxTopics(0); + conf2.setLoadBalancerNamespaceBundleMaxTopics(1); loadManager.checkNamespaceBundleSplit(); // (6) Broker-2 should get the watch and update bundle cache @@ -1063,15 +1070,15 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { }); // (7) Make lookup request again to Broker-2 which should succeed. - final String topic2 = "persistent://" + namespace + "/topic2"; + final String topic3 = "persistent://" + namespace + "/topic3"; @Cleanup - Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2) + Consumer<byte[]> consumer3 = pulsarClient2.newConsumer().topic(topic3) .subscriptionName("my-subscriber-name") .subscribe(); Awaitility.await().untilAsserted(() -> { NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService() - .getBundle(TopicName.get(topic2)); + .getBundle(TopicName.get(topic3)); assertNotEquals(bundleInBroker1AfterSplit.toString(), unsplitBundle); }); } finally {
