This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 31d59cfa080 [branch-2.10][fix][broker] Avoid infinite bundle unloading
(#20822) (#20877)
31d59cfa080 is described below
commit 31d59cfa080d6fba89e2f7074faa1d5c5fe973f9
Author: Kai Wang <[email protected]>
AuthorDate: Tue Aug 8 14:41:49 2023 +0800
[branch-2.10][fix][broker] Avoid infinite bundle unloading (#20822) (#20877)
---
.../broker/loadbalance/impl/LoadManagerShared.java | 6 +
.../loadbalance/impl/ModularLoadManagerImpl.java | 170 ++++++++++++---------
.../{ => impl}/ModularLoadManagerImplTest.java | 62 ++++++--
3 files changed, 155 insertions(+), 83 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index e0127bd3864..26c31a6e8a9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -554,4 +554,10 @@ public class LoadManagerShared {
brokerCandidateCache.addAll(filteredBrokerCandidates);
}
}
+
+ public static NamespaceBundle getNamespaceBundle(PulsarService pulsar,
String bundle) {
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ return
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName,
bundleRange);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 07408305289..301f19213c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -642,9 +643,21 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.info("[{}] No broker available to unload bundle {}
from broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("[{}] The destination broker {} is the same
as the current owner broker for Bundle {}",
+ strategy.getClass().getSimpleName(),
destBroker.get(), bundle);
+ return;
+ }
- log.info("[{}] Unloading bundle: {} from broker {}",
- strategy.getClass().getSimpleName(), bundle,
broker);
+ log.info("[{}] Unloading bundle: {} from broker {} to dest
broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker, destBroker.get());
try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle,
System.currentTimeMillis());
@@ -799,16 +812,56 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// If the given bundle is already in preallocated, return
the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
- final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
- key -> getBundleDataOrDefault(bundle));
- brokerCandidateCache.clear();
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- // filter brokers which owns topic higher than threshold
-
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache,
loadData,
- conf.getLoadBalancerBrokerMaxTopics());
+ Optional<String> broker = selectBroker(serviceUnit);
+ if (!broker.isPresent()) {
+ // If no broker is selected, return empty.
+ return broker;
+ }
+ // Add new bundle to preallocated.
+ preallocateBundle(bundle, broker.get());
+ return broker;
+ }
+ } finally {
+ selectBrokerForAssignment.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ }
+ }
+
+ private void preallocateBundle(String bundle, String broker) {
+ final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
+ key -> getBundleDataOrDefault(bundle));
+
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle,
data);
+ preallocatedBundleToBroker.put(bundle, broker);
+
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
namespaceToBundleRange =
+ brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker,
+ k -> ConcurrentOpenHashMap.<String,
+
ConcurrentOpenHashSet<String>>newBuilder()
+ .build());
+ synchronized (namespaceToBundleRange) {
+ namespaceToBundleRange.computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .add(bundleRange);
+ }
+ }
+
+ @VisibleForTesting
+ Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
+ synchronized (brokerCandidateCache) {
+ final String bundle = serviceUnit.toString();
+ final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
+ key -> getBundleDataOrDefault(bundle));
+ brokerCandidateCache.clear();
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies,
brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+
+ // filter brokers which owns topic higher than threshold
+
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache,
loadData,
+ conf.getLoadBalancerBrokerMaxTopics());
// distribute namespaces to domain and brokers according to
anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar,
serviceUnit.toString(),
@@ -820,71 +873,50 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
brokerToNamespaceToBundleRange);
log.info("{} brokers being considered for assignment of {}",
brokerCandidateCache.size(), bundle);
- // Use the filter pipeline to finalize broker candidates.
- try {
- for (BrokerFilter filter : filterPipeline) {
- filter.filter(brokerCandidateCache, data, loadData,
conf);
- }
- } catch (BrokerFilterException x) {
- // restore the list of brokers to the full set
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- }
-
- if (brokerCandidateCache.isEmpty()) {
- // restore the list of brokers to the full set
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
+ // Use the filter pipeline to finalize broker candidates.
+ try {
+ for (BrokerFilter filter : filterPipeline) {
+ filter.filter(brokerCandidateCache, data, loadData, conf);
}
+ } catch (BrokerFilterException x) {
+ // restore the list of brokers to the full set
+ LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+ }
- // Choose a broker among the potentially smaller filtered
list, when possible
- Optional<String> broker =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
- if (log.isDebugEnabled()) {
- log.debug("Selected broker {} from candidate brokers {}",
broker, brokerCandidateCache);
- }
+ if (brokerCandidateCache.isEmpty()) {
+ // restore the list of brokers to the full set
+ LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+ }
- if (!broker.isPresent()) {
- // No brokers available
- return broker;
- }
+ // Choose a broker among the potentially smaller filtered list,
when possible
+ Optional<String> broker =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+ if (log.isDebugEnabled()) {
+ log.debug("Selected broker {} from candidate brokers {}",
broker, brokerCandidateCache);
+ }
- final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
- final double maxUsage =
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
- if (maxUsage > overloadThreshold) {
- // All brokers that were in the filtered list were
overloaded, so check if there is a better broker
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- Optional<String> brokerTmp =
-
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
- if (brokerTmp.isPresent()) {
- broker = brokerTmp;
- }
- }
+ if (!broker.isPresent()) {
+ // No brokers available
+ return broker;
+ }
- // Add new bundle to preallocated.
-
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle,
data);
- preallocatedBundleToBroker.put(bundle, broker.get());
-
- final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
- final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
- final ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<String>> namespaceToBundleRange =
- brokerToNamespaceToBundleRange
- .computeIfAbsent(broker.get(),
- k -> ConcurrentOpenHashMap.<String,
-
ConcurrentOpenHashSet<String>>newBuilder()
- .build());
- synchronized (namespaceToBundleRange) {
- namespaceToBundleRange.computeIfAbsent(namespaceName,
- k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
- .add(bundleRange);
+ final double overloadThreshold =
conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+ final double maxUsage =
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
+ if (maxUsage > overloadThreshold) {
+ // All brokers that were in the filtered list were overloaded,
so check if there is a better broker
+ LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
+ getAvailableBrokers(),
+ brokerTopicLoadingPredicate);
+ Optional<String> brokerTmp =
+ placementStrategy.selectBroker(brokerCandidateCache,
data, loadData, conf);
+ if (brokerTmp.isPresent()) {
+ broker = brokerTmp;
}
- return broker;
}
- } finally {
- selectBrokerForAssignment.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ return broker;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
similarity index 91%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 6b5898f4174..dae160b1421 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.loadbalance;
+package org.apache.pulsar.broker.loadbalance.impl;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -51,12 +53,12 @@ import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
-import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -329,38 +331,70 @@ public class ModularLoadManagerImplTest {
bundleReference.set(invocation.getArguments()[0].toString() + '/'
+ invocation.getArguments()[1]);
return null;
}).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+
+ AtomicReference<Optional<String>> selectedBrokerRef = new
AtomicReference<>();
+ ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
+ doAnswer(invocation -> {
+ ServiceUnitId serviceUnitId = (ServiceUnitId)
invocation.getArguments()[0];
+ Optional<String> broker =
primaryLoadManager.selectBroker(serviceUnitId);
+ selectedBrokerRef.set(broker);
+ return broker;
+ }).when(primaryLoadManagerSpy).selectBroker(any());
+
setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
- final LoadData loadData = (LoadData) getField(primaryLoadManager,
"loadData");
+ final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy,
"loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
- primaryLoadManager.handleDataNotification(new
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT +
"/broker:8080"));
+ primaryLoadManagerSpy.handleDataNotification(new
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT +
"/broker:8080"));
Thread.sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// 80% is below overload threshold: verify nothing is unloaded.
- verify(namespacesSpy1,
Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(0))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
localBrokerData.setCpu(new ResourceUsage(90, 100));
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// Most expensive bundle will be unloaded.
- verify(namespacesSpy1,
Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(1))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(2));
+ assertEquals(selectedBrokerRef.get().get(), secondaryHost);
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle
would move off and nothing would be
// unloaded, but this is not the case due to the spy's behavior).
- verify(namespacesSpy1,
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(2))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(1));
+ assertEquals(selectedBrokerRef.get().get(), secondaryHost);
- primaryLoadManager.doLoadShedding();
+ primaryLoadManagerSpy.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
- verify(namespacesSpy1,
Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ verify(namespacesSpy1, Mockito.times(2))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+
+ // Test bundle transfer to same broker
+
+ loadData.getRecentlyUnloadedBundles().clear();
+ primaryLoadManagerSpy.doLoadShedding();
+ verify(namespacesSpy1, Mockito.times(3))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+
+
doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
+ loadData.getRecentlyUnloadedBundles().clear();
+ primaryLoadManagerSpy.doLoadShedding();
+ // The bundle shouldn't be unloaded because the broker is the same.
+ verify(namespacesSpy1, Mockito.times(3))
+ .unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+
}
// Test that ModularLoadManagerImpl will determine that writing local data
to ZooKeeper is necessary if certain