This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 89e4d46f4a5585ac126a03aa298020203100ad71 Author: lifepuzzlefun <wjl_is_...@163.com> AuthorDate: Sun Apr 23 11:27:48 2023 +0800 [fix] [broker] Make `LeastResourceUsageWithWeight` thread safe (#20159) Fix #20160 ### Motivation LeastResourceUsageWithWeight is an stateful object and current code will be accessed by multithread. thread 1: is execute https://github.com/apache/pulsar/blob/2b41e4eafb1cba0e548dd90df60e8cdbb24cd490/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java#L89-L91 and thread 2: is execute https://github.com/apache/pulsar/blob/2b41e4eafb1cba0e548dd90df60e8cdbb24cd490/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java#L147-L150 so an IndexOutOfBound occurs. ### Modifications change the state to `ThreadLocal` (cherry picked from commit 963260abfa142b8cf9ffe372d85d470a78c17235) --- .../strategy/LeastResourceUsageWithWeight.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java index 902cfdaf73f..98986d84b98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.concurrent.ThreadSafe; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; @@ -35,14 +36,15 @@ import org.apache.pulsar.common.naming.ServiceUnitId; * cause cluster fluctuations due to short-term load jitter. */ @Slf4j +@ThreadSafe public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy { // Maintain this list to reduce object creation. - private final ArrayList<String> bestBrokers; - private final Set<String> noLoadDataBrokers; + private final ThreadLocal<ArrayList<String>> bestBrokers; + private final ThreadLocal<HashSet<String>> noLoadDataBrokers; public LeastResourceUsageWithWeight() { - this.bestBrokers = new ArrayList<>(); - this.noLoadDataBrokers = new HashSet<>(); + this.bestBrokers = ThreadLocal.withInitial(ArrayList::new); + this.noLoadDataBrokers = ThreadLocal.withInitial(HashSet::new); } // A broker's max resource usage with weight using its historical load and short-term load data with weight. @@ -70,7 +72,6 @@ public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy { /** * Find a suitable broker to assign the given bundle to. - * This method is not thread safety. * * @param candidates The candidates for which the bundle may be assigned. * @param bundleToAssign The data for the bundle to assign. @@ -86,6 +87,9 @@ public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy { return Optional.empty(); } + ArrayList<String> bestBrokers = this.bestBrokers.get(); + HashSet<String> noLoadDataBrokers = this.noLoadDataBrokers.get(); + bestBrokers.clear(); noLoadDataBrokers.clear(); // Maintain of list of all the best scoring brokers and then randomly @@ -135,9 +139,7 @@ public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy { log.info("Assign randomly as none of the brokers are underloaded. candidatesSize:{}, " + "noLoadDataBrokersSize:{}", candidates.size(), noLoadDataBrokers.size()); } - for (String broker : candidates) { - bestBrokers.add(broker); - } + bestBrokers.addAll(candidates); } if (debugMode) {