Copilot commented on code in PR #10745: URL: https://github.com/apache/dubbo/pull/10745#discussion_r2740274964
########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; Review Comment: Potential bit shift issue with negative values. The expression metrics.lastLatency >> multiple performs an arithmetic right shift which preserves the sign bit. If lastLatency becomes negative (which shouldn't happen in normal operation but isn't explicitly prevented), this could produce unexpected results. Consider using unsigned right shift (>>>) or adding validation to ensure lastLatency remains non-negative. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey){ + return metricsStatistics.computeIfAbsent(idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey,long time){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + + public void setProviderMetrics(String idKey,Map<String,String> metricsMap){ + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0")); + //If server time is less than the current time, discard + if (metrics.currentProviderTime > serviceTime){ + return; + } + + metrics.currentProviderTime = serviceTime; + metrics.currentTime = serviceTime; + metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v,true)).orElse("0")); + metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0"))); + + metrics.beta = 0.5; Review Comment: Magic number without explanation. The value 0.5 for beta is hardcoded without documentation explaining why this specific value was chosen for the exponential weighted moving average calculation. Consider adding a comment explaining this is a standard EWMA smoothing factor or making it configurable if different scenarios require different values. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey){ + return metricsStatistics.computeIfAbsent(idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey,long time){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + + public void setProviderMetrics(String idKey,Map<String,String> metricsMap){ + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0")); + //If server time is less than the current time, discard + if (metrics.currentProviderTime > serviceTime){ + return; + } + + metrics.currentProviderTime = serviceTime; + metrics.currentTime = serviceTime; + metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v,true)).orElse("0")); + metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0"))); + + metrics.beta = 0.5; + //Vt = β * Vt-1 + (1 - β ) * θt + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + Review Comment: Thread-safety issue: Multiple threads can access and modify metrics fields (currentProviderTime, providerCPULoad, lastLatency, currentTime, ewma) without synchronization. The ConcurrentHashMap ensures that different AdaptiveMetrics instances are handled correctly, but within a single instance, these fields are not thread-safe. Consider using synchronized methods or atomic variables for these fields. ```suggestion synchronized (metrics) { //If server time is less than the current time, discard if (metrics.currentProviderTime > serviceTime){ return; } metrics.currentProviderTime = serviceTime; metrics.currentTime = serviceTime; metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v,true)).orElse("0")); metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0"))); metrics.beta = 0.5; //Vt = β * Vt-1 + (1 - β ) * θt metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; } ``` ########## dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalanceTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest { + + private ApplicationModel scopeModel; + + private AdaptiveMetrics adaptiveMetrics; + + @Test + @Order(0) + void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker3 = 0; + int loop = 10000; + + ApplicationModel scopeModel = ApplicationModel.defaultModel(); + + AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokers, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + + if (selected.getUrl().getProtocol().equals("test3")) { + sumInvoker3++; + } + } + + // 1 : 9 : 6 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + System.out.println(sumInvoker3); + Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker3, loop, "select failed!"); + } + + private String buildServiceKey(Invoker invoker){ + URL url = invoker.getUrl(); + return url.getAddress() + ":" + invocation.getProtocolServiceKey(); + } + + private AdaptiveMetrics getAdaptiveMetricsInstance(){ + if (adaptiveMetrics == null) { + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + return adaptiveMetrics; + } + + @Test + @Order(1) + void testSelectByAdaptive() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker5 = 0; + int loop = 10000; + + scopeModel = ApplicationModel.defaultModel(); + AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel); + + lb.select(weightInvokersSR, null, weightTestInvocation); + + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation); + + Map<String, String> metricsMap = new HashMap<>(); + String idKey = buildServiceKey(selected); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + metricsMap.put("rt", "10"); + metricsMap.put("load", "10"); + metricsMap.put("curTime", String.valueOf(System.currentTimeMillis()-10)); + getAdaptiveMetricsInstance().addConsumerSuccess(idKey); + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + metricsMap.put("rt", "100"); + metricsMap.put("load", "40"); + metricsMap.put("curTime", String.valueOf(System.currentTimeMillis()-100)); + getAdaptiveMetricsInstance().addConsumerSuccess(idKey); + } + + if (selected.getUrl().getProtocol().equals("test5")) { + metricsMap.put("rt", "5000"); + metricsMap.put("load", "400");//400% + metricsMap.put("curTime", String.valueOf(System.currentTimeMillis() - 5000)); + + getAdaptiveMetricsInstance().addErrorReq(idKey); + sumInvoker5++; + } + getAdaptiveMetricsInstance().setProviderMetrics(idKey,metricsMap); + + } + Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = weightInvokersSR.stream() + .collect(Collectors.toMap(Function.identity(), e -> Integer.valueOf(e.getUrl().getParameter("weight")))); + Integer totalWeight = weightMap.values().stream().reduce(0, Integer::sum); + // max deviation = expectWeightValue * 2 + int expectWeightValue = loop / totalWeight; + int maxDeviation = expectWeightValue * 2; + double beta = 0.5; + //this EMA is an approximate value + double ewma1 = beta * 50 + (1 - beta) * 10; + double ewma2 = beta * 50 + (1 - beta) * 100; + double ewma5 = beta * 50 + (1 - beta) * 5000; + + AtomicInteger weight1 = new AtomicInteger(); + AtomicInteger weight2 = new AtomicInteger(); + AtomicInteger weight5 = new AtomicInteger(); + weightMap.forEach((k, v) ->{ + if (k.getUrl().getProtocol().equals("test1")){ + weight1.set(v); + } + else if (k.getUrl().getProtocol().equals("test2")){ + weight2.set(v); + } + else if (k.getUrl().getProtocol().equals("test5")){ + weight5.set(v); + } + }); + + Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker5, loop, "select failed!"); + Assertions.assertTrue(Math.abs(sumInvoker1 / (weightMap.get(weightInvoker1) * ewma1) - expectWeightValue) < maxDeviation, "select failed!"); + Assertions.assertTrue(Math.abs(sumInvoker2 / (weightMap.get(weightInvoker2) * ewma2) - expectWeightValue) < maxDeviation, "select failed!"); + Assertions.assertTrue(Math.abs(sumInvoker5 / (weightMap.get(weightInvoker5) * ewma5) - expectWeightValue) < maxDeviation, "select failed!"); Review Comment: The test assertion logic is incorrect. Lines 163-165 attempt to calculate expected values based on weight and ewma, but the division operations don't match the load calculation formula used in the actual code. In AdaptiveMetrics.getLoad(), the formula divides by ((successRate * weight) + 1), not by (weight * ewma). The test's assertion logic should match the actual implementation's load calculation formula for accurate verification. ```suggestion // successRate is 1.0 for test1 and test2 (all successes), and 0.0 for test5 (all errors) in this test. Assertions.assertTrue(Math.abs(sumInvoker1 * ewma1 / ((1.0 * weight1.get()) + 1) - expectWeightValue) < maxDeviation, "select failed!"); Assertions.assertTrue(Math.abs(sumInvoker2 * ewma2 / ((1.0 * weight2.get()) + 1) - expectWeightValue) < maxDeviation, "select failed!"); Assertions.assertTrue(Math.abs(sumInvoker5 * ewma5 / ((0.0 * weight5.get()) + 1) - expectWeightValue) < maxDeviation, "select failed!"); ``` ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.resource.GlobalResourcesRepository; +import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * if the load balance is adaptive ,set attachment to get the metrics of the server + * @see org.apache.dubbo.rpc.Filter + * @see org.apache.dubbo.rpc.RpcContext + */ +@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) +public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener { + + /** + * uses a single worker thread operating off an bounded queue + */ + private volatile ThreadPoolExecutor executor = null; + + private AdaptiveMetrics adaptiveMetrics; + + public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) { + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + + private ThreadPoolExecutor getExecutor(){ + if (null == executor) { + synchronized (this) { + if (null == executor) { + executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), + new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); + GlobalResourcesRepository.getInstance().registerDisposable(() -> this.executor.shutdown()); + } + } + } + return executor; + } + + @Override + public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { + return invoker.invoke(invocation); + } + + private String buildServiceKey(Invocation invocation){ + StringBuilder sb = new StringBuilder(128); + sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey()); + return sb.toString(); + } + + private String getServiceKey(Invocation invocation){ + + String key = (String) invocation.getAttributes().get(invocation.getInvoker()); + if (StringUtils.isNotEmpty(key)){ + return key; + } + + key = buildServiceKey(invocation); + invocation.getAttributes().put(invocation.getInvoker(),key); + return key; + } + + @Override + public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { + + try { + String loadBalance = (String) invocation.getAttributes().get(LOADBALANCE_KEY); + if (StringUtils.isEmpty(loadBalance) + || !LoadbalanceRules.ADAPTIVE.equals(loadBalance)) { + return; + } + adaptiveMetrics.addConsumerSuccess(getServiceKey(invocation)); + String attachment = appResponse.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + if (StringUtils.isNotEmpty(attachment)) { + String[] parties = COMMA_SPLIT_PATTERN.split(attachment); + if (parties.length == 0) { + return; + } + Map<String, String> metricsMap = new HashMap<>(); + for (String party : parties) { + String[] groups = party.split(":"); + if (groups.length != 2) { + continue; + } + metricsMap.put(groups[0], groups[1]); + } + + Long startTime = (Long) invocation.getAttributes().get(Constants.ADAPTIVE_LOADBALANCE_START_TIME); + if (null != startTime) { + metricsMap.put("rt", String.valueOf(System.currentTimeMillis() - startTime)); + } + + getExecutor().execute(() -> { + adaptiveMetrics.setProviderMetrics(getServiceKey(invocation), metricsMap); + }); + } + } + finally { + appResponse.getAttachments().remove(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + } + + } + + @Override + public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { + String loadBalance = (String) invocation.getAttributes().get(LOADBALANCE_KEY); + if (StringUtils.isNotEmpty(loadBalance) + && LoadbalanceRules.ADAPTIVE.equals(loadBalance)) { + getExecutor().execute(() -> { + adaptiveMetrics.addErrorReq(getServiceKey(invocation)); + }); + } + } + + +} Review Comment: Missing test coverage for AdaptiveLoadBalanceFilter. The repository has comprehensive test coverage for other filters in the same package (ActiveLimitFilterTest, ContextFilterTest, etc.), but there's no test file for AdaptiveLoadBalanceFilter. Unit tests should be added to verify the filter's behavior including attachment handling, metrics collection, and error scenarios. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } Review Comment: Redundant condition check. The condition 'multiple > 0' on line 58 is always true because multiple is calculated as (System.currentTimeMillis() - metrics.currentTime) / timeout + 1. Since currentTime is less than currentTimeMillis (we're in the if block where currentTime > 0), and timeout is positive, multiple will always be at least 1. This check is unnecessary and can be removed. ```suggestion if (metrics.currentProviderTime == metrics.currentTime) { //penalty value metrics.lastLatency = timeout * 2L; }else { metrics.lastLatency = metrics.lastLatency >> multiple; } metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; metrics.currentTime = System.currentTimeMillis(); ``` ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ProfilerServerFilter.java: ########## @@ -78,6 +85,18 @@ private void afterInvoke(Invoker<?> invoker, Invocation invocation) { } } } + private void addAdaptiveResponse(Result appResponse, Invocation invocation) { + String adaptiveLoadAttachment = invocation.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + if (StringUtils.isNotEmpty(adaptiveLoadAttachment)) { + OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); + + StringBuilder sb = new StringBuilder(64); + sb.append("curTime:").append(System.currentTimeMillis()); + sb.append(COMMA_SEPARATOR).append("load:").append(operatingSystemMXBean.getSystemLoadAverage() * 100 / operatingSystemMXBean.getAvailableProcessors() ); + Review Comment: No error handling for OperatingSystemMXBean operations. getSystemLoadAverage() can return -1 on systems where load average is not available (e.g., Windows). This would result in a negative load value being sent in the attachment. Consider adding a check: if (loadAvg < 0) to handle this case appropriately. ```suggestion double loadAvg = operatingSystemMXBean.getSystemLoadAverage(); int availableProcessors = operatingSystemMXBean.getAvailableProcessors(); long loadValue; if (loadAvg >= 0 && availableProcessors > 0) { loadValue = Math.round(loadAvg * 100 / availableProcessors); } else { // Fallback when system load average is not available or processor count is invalid loadValue = 0L; } sb.append(COMMA_SEPARATOR).append("load:").append(loadValue); ``` ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java: ########## @@ -104,9 +104,12 @@ public interface Constants { String H2_SETTINGS_MAX_FRAME_SIZE_KEY = "dubbo.rpc.tri.max-frame-size"; String H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY = "dubbo.rpc.tri.max-header-list-size"; + String ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY = "lb_adaptive"; + String ADAPTIVE_LOADBALANCE_START_TIME = "adaptive_startTime"; Review Comment: Constants are inserted in the middle of H2-related settings group, breaking logical grouping. The adaptive load balance constants (lines 107-108) are placed between H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY and H2_SUPPORT_NO_LOWER_HEADER_KEY. Consider moving these constants to a separate section or grouping them with other load balance related constants for better code organization. ########## dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalance.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.model.ApplicationModel; +import org.apache.dubbo.rpc.support.RpcUtils; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * AdaptiveLoadBalance + * </p> + */ +public class AdaptiveLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "adaptive"; + + //default key + private String attachmentKey = "mem,load"; + + private AdaptiveMetrics adaptiveMetrics; + + public AdaptiveLoadBalance(ApplicationModel scopeModel){ + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + + @Override + protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { + Invoker<T> invoker = selectByP2C(invokers,invocation); + invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey); + long startTime = System.currentTimeMillis(); + invocation.getAttributes().put(Constants.ADAPTIVE_LOADBALANCE_START_TIME,startTime); + invocation.getAttributes().put(LOADBALANCE_KEY,LoadbalanceRules.ADAPTIVE); + adaptiveMetrics.addConsumerReq(getServiceKey(invoker,invocation)); + adaptiveMetrics.setPickTime(getServiceKey(invoker,invocation),startTime); + + return invoker; + } + + private <T> Invoker<T> selectByP2C(List<Invoker<T>> invokers, Invocation invocation){ + int length = invokers.size(); + if(length == 1) { + return invokers.get(0); + } + + if(length == 2) { + return chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation); + } + + int pos1 = ThreadLocalRandom.current().nextInt(length); + int pos2 = ThreadLocalRandom.current().nextInt(length - 1); + if (pos2 >= pos1) { + pos2 = pos2 + 1; + } + + return chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation); + } + + private String getServiceKey(Invoker<?> invoker,Invocation invocation){ + + String key = (String) invocation.getAttributes().get(invoker); + if (StringUtils.isNotEmpty(key)){ + return key; + } + + key = buildServiceKey(invoker,invocation); + invocation.getAttributes().put(invoker,key); + return key; + } + + private String buildServiceKey(Invoker<?> invoker,Invocation invocation){ + URL url = invoker.getUrl(); + StringBuilder sb = new StringBuilder(128); + sb.append(url.getAddress()).append(":").append(invocation.getProtocolServiceKey()); + return sb.toString(); + } + + private int getTimeout(Invoker<?> invoker, Invocation invocation) { + URL url = invoker.getUrl(); + String methodName = RpcUtils.getMethodName(invocation); + return (int) RpcUtils.getTimeout(url,methodName, RpcContext.getClientAttachment(),invocation, DEFAULT_TIMEOUT); + } + + private <T> Invoker<T> chooseLowLoadInvoker(Invoker<T> invoker1,Invoker<T> invoker2,Invocation invocation){ + int weight1 = getWeight(invoker1, invocation); + int weight2 = getWeight(invoker2, invocation); + int timeout1 = getTimeout(invoker2, invocation); Review Comment: Both timeout1 and timeout2 are incorrectly calling getTimeout with invoker2. Line 113 should use invoker1 instead of invoker2 to get the correct timeout value for the first invoker. ```suggestion int timeout1 = getTimeout(invoker1, invocation); ``` ########## dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalance.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.model.ApplicationModel; +import org.apache.dubbo.rpc.support.RpcUtils; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * AdaptiveLoadBalance + * </p> + */ +public class AdaptiveLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "adaptive"; + + //default key + private String attachmentKey = "mem,load"; Review Comment: Unused field 'attachmentKey' is set to a default value but never modified or read in any meaningful way. The field is set in line 45 and used in line 56, but the value is never configured from outside. If this is intended to be configurable, it should have a setter or be initialized from configuration. If it's not meant to be configurable, consider making it a constant or removing it if it's truly unused. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey){ + return metricsStatistics.computeIfAbsent(idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey,long time){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + + public void setProviderMetrics(String idKey,Map<String,String> metricsMap){ + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0")); + //If server time is less than the current time, discard + if (metrics.currentProviderTime > serviceTime){ + return; + } + + metrics.currentProviderTime = serviceTime; + metrics.currentTime = serviceTime; + metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v,true)).orElse("0")); + metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0"))); + + metrics.beta = 0.5; + //Vt = β * Vt-1 + (1 - β ) * θt + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + + } +} + Review Comment: Missing test coverage for AdaptiveMetrics. No unit tests are provided for the AdaptiveMetrics class which contains complex calculation logic including EWMA calculations, load scoring, and concurrent data structures. Tests should verify the correctness of getLoad calculations, thread safety of concurrent operations, and edge cases like timeout handling and metrics decay. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey){ + return metricsStatistics.computeIfAbsent(idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey,long time){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + + public void setProviderMetrics(String idKey,Map<String,String> metricsMap){ + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0")); + //If server time is less than the current time, discard + if (metrics.currentProviderTime > serviceTime){ + return; + } + + metrics.currentProviderTime = serviceTime; + metrics.currentTime = serviceTime; + metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v,true)).orElse("0")); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey){ + return metricsStatistics.computeIfAbsent(idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey,long time){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + + public void setProviderMetrics(String idKey,Map<String,String> metricsMap){ + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0")); + //If server time is less than the current time, discard + if (metrics.currentProviderTime > serviceTime){ + return; + } + + metrics.currentProviderTime = serviceTime; + metrics.currentTime = serviceTime; + metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v,true)).orElse("0")); + metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0"))); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ProfilerServerFilter.java: ########## @@ -78,6 +85,18 @@ private void afterInvoke(Invoker<?> invoker, Invocation invocation) { } } } + private void addAdaptiveResponse(Result appResponse, Invocation invocation) { + String adaptiveLoadAttachment = invocation.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + if (StringUtils.isNotEmpty(adaptiveLoadAttachment)) { + OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); + + StringBuilder sb = new StringBuilder(64); Review Comment: Inefficient string building with fixed capacity. The StringBuilder is initialized with a capacity of 64 characters, but this may be excessive or insufficient depending on the actual data. The constructed string "curTime:1234567890123,load:45.67" is relatively short and predictable in length. Consider adjusting the initial capacity to match the expected size more accurately (approximately 30-40 characters) to reduce memory waste. ```suggestion StringBuilder sb = new StringBuilder(40); ``` ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.resource.GlobalResourcesRepository; +import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * if the load balance is adaptive ,set attachment to get the metrics of the server + * @see org.apache.dubbo.rpc.Filter + * @see org.apache.dubbo.rpc.RpcContext + */ +@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) +public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener { + + /** + * uses a single worker thread operating off an bounded queue + */ + private volatile ThreadPoolExecutor executor = null; + + private AdaptiveMetrics adaptiveMetrics; + + public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) { + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + + private ThreadPoolExecutor getExecutor(){ + if (null == executor) { + synchronized (this) { + if (null == executor) { + executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), + new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); + GlobalResourcesRepository.getInstance().registerDisposable(() -> this.executor.shutdown()); + } + } + } + return executor; + } + + @Override + public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { + return invoker.invoke(invocation); + } + + private String buildServiceKey(Invocation invocation){ + StringBuilder sb = new StringBuilder(128); + sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey()); + return sb.toString(); + } + + private String getServiceKey(Invocation invocation){ + + String key = (String) invocation.getAttributes().get(invocation.getInvoker()); + if (StringUtils.isNotEmpty(key)){ + return key; + } + + key = buildServiceKey(invocation); + invocation.getAttributes().put(invocation.getInvoker(),key); + return key; + } + + @Override + public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { + + try { + String loadBalance = (String) invocation.getAttributes().get(LOADBALANCE_KEY); + if (StringUtils.isEmpty(loadBalance) + || !LoadbalanceRules.ADAPTIVE.equals(loadBalance)) { + return; + } + adaptiveMetrics.addConsumerSuccess(getServiceKey(invocation)); + String attachment = appResponse.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + if (StringUtils.isNotEmpty(attachment)) { + String[] parties = COMMA_SPLIT_PATTERN.split(attachment); + if (parties.length == 0) { + return; + } + Map<String, String> metricsMap = new HashMap<>(); + for (String party : parties) { + String[] groups = party.split(":"); + if (groups.length != 2) { + continue; + } + metricsMap.put(groups[0], groups[1]); + } + + Long startTime = (Long) invocation.getAttributes().get(Constants.ADAPTIVE_LOADBALANCE_START_TIME); + if (null != startTime) { + metricsMap.put("rt", String.valueOf(System.currentTimeMillis() - startTime)); + } + + getExecutor().execute(() -> { + adaptiveMetrics.setProviderMetrics(getServiceKey(invocation), metricsMap); + }); + } + } + finally { + appResponse.getAttachments().remove(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + } Review Comment: Attachment cleanup occurs in finally block but should be conditional. The code removes the ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY from attachments regardless of whether the filter actually added it. If the filter logic in onResponse didn't execute due to early return (lines 103-106), removing the attachment could be incorrect. Consider moving the removal inside the try block after successful processing. ########## dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalance.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.model.ApplicationModel; +import org.apache.dubbo.rpc.support.RpcUtils; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * AdaptiveLoadBalance + * </p> + */ +public class AdaptiveLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "adaptive"; + + //default key + private String attachmentKey = "mem,load"; + + private AdaptiveMetrics adaptiveMetrics; + + public AdaptiveLoadBalance(ApplicationModel scopeModel){ + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + + @Override + protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { + Invoker<T> invoker = selectByP2C(invokers,invocation); + invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey); + long startTime = System.currentTimeMillis(); + invocation.getAttributes().put(Constants.ADAPTIVE_LOADBALANCE_START_TIME,startTime); + invocation.getAttributes().put(LOADBALANCE_KEY,LoadbalanceRules.ADAPTIVE); + adaptiveMetrics.addConsumerReq(getServiceKey(invoker,invocation)); + adaptiveMetrics.setPickTime(getServiceKey(invoker,invocation),startTime); + + return invoker; + } + + private <T> Invoker<T> selectByP2C(List<Invoker<T>> invokers, Invocation invocation){ + int length = invokers.size(); + if(length == 1) { + return invokers.get(0); + } + + if(length == 2) { + return chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation); + } + + int pos1 = ThreadLocalRandom.current().nextInt(length); + int pos2 = ThreadLocalRandom.current().nextInt(length - 1); + if (pos2 >= pos1) { + pos2 = pos2 + 1; + } + + return chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation); + } + + private String getServiceKey(Invoker<?> invoker,Invocation invocation){ + + String key = (String) invocation.getAttributes().get(invoker); + if (StringUtils.isNotEmpty(key)){ + return key; + } + + key = buildServiceKey(invoker,invocation); + invocation.getAttributes().put(invoker,key); + return key; Review Comment: Invoker object is used as a map key in invocation attributes. On line 87, the invoker object itself is used as a key in invocation.getAttributes(). This is unusual and potentially problematic because invokers are heavyweight objects and using them as map keys may not provide the expected behavior if invoker instances are recreated. Consider using a string-based key or the invoker's URL as the key instead. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.resource.GlobalResourcesRepository; +import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * if the load balance is adaptive ,set attachment to get the metrics of the server + * @see org.apache.dubbo.rpc.Filter + * @see org.apache.dubbo.rpc.RpcContext + */ +@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) +public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener { + + /** + * uses a single worker thread operating off an bounded queue + */ + private volatile ThreadPoolExecutor executor = null; + + private AdaptiveMetrics adaptiveMetrics; + + public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) { + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + + private ThreadPoolExecutor getExecutor(){ + if (null == executor) { + synchronized (this) { + if (null == executor) { + executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), + new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); + GlobalResourcesRepository.getInstance().registerDisposable(() -> this.executor.shutdown()); + } + } + } + return executor; + } Review Comment: ThreadPoolExecutor initialization and shutdown have potential resource leak. The executor is registered for disposal via GlobalResourcesRepository.getInstance().registerDisposable(), but if the filter is instantiated multiple times (e.g., in different application contexts), each instance will register its own shutdown hook. Additionally, the double-checked locking pattern used here is correct, but consider whether multiple filter instances sharing state through AdaptiveMetrics is the intended design. ########## dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalanceTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest { + + private ApplicationModel scopeModel; + + private AdaptiveMetrics adaptiveMetrics; + + @Test + @Order(0) + void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker3 = 0; + int loop = 10000; + + ApplicationModel scopeModel = ApplicationModel.defaultModel(); + + AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokers, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + + if (selected.getUrl().getProtocol().equals("test3")) { + sumInvoker3++; + } + } + + // 1 : 9 : 6 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + System.out.println(sumInvoker3); Review Comment: Test uses System.out.println for output instead of proper logging or assertions. Lines 71-73 print sumInvoker values but don't verify them against expected values. Consider either removing these print statements or converting them to proper assertions that validate the distribution is within acceptable bounds. ```suggestion ``` ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ProfilerServerFilter.java: ########## @@ -78,6 +85,18 @@ private void afterInvoke(Invoker<?> invoker, Invocation invocation) { } } } + private void addAdaptiveResponse(Result appResponse, Invocation invocation) { + String adaptiveLoadAttachment = invocation.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); + if (StringUtils.isNotEmpty(adaptiveLoadAttachment)) { + OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); + + StringBuilder sb = new StringBuilder(64); + sb.append("curTime:").append(System.currentTimeMillis()); + sb.append(COMMA_SEPARATOR).append("load:").append(operatingSystemMXBean.getSystemLoadAverage() * 100 / operatingSystemMXBean.getAvailableProcessors() ); Review Comment: Potential integer overflow and incorrect percentage calculation. The expression operatingSystemMXBean.getSystemLoadAverage() * 100 / operatingSystemMXBean.getAvailableProcessors() can produce misleading results. getSystemLoadAverage() returns a value typically between 0 and the number of processors (e.g., 0-4 for a 4-core system). Multiplying by 100 and dividing by processor count doesn't yield a proper percentage. For example, on a 4-core system with load of 2.0, this calculates (2.0 * 100) / 4 = 50, but the actual CPU load percentage should be (2.0 / 4) * 100 = 50. The order of operations matters for precision and semantics. ```suggestion sb.append(COMMA_SEPARATOR).append("load:").append( (operatingSystemMXBean.getSystemLoadAverage() / operatingSystemMXBean.getAvailableProcessors()) * 100 ); ``` ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); Review Comment: Complex load calculation formula lacks documentation and explanation. The formula on line 71 combines CPU load, EWMA, inflight requests, and success rate in a specific way, but there's no explanation of why this particular formula was chosen or how each component contributes to the final load score. Adding detailed comments explaining the rationale and expected behavior would improve maintainability. For example, explain why sqrt(ewma) is used instead of ewma directly, and what the +1 terms prevent. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.resource.GlobalResourcesRepository; +import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * if the load balance is adaptive ,set attachment to get the metrics of the server + * @see org.apache.dubbo.rpc.Filter + * @see org.apache.dubbo.rpc.RpcContext + */ +@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) +public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener { + + /** + * uses a single worker thread operating off an bounded queue + */ + private volatile ThreadPoolExecutor executor = null; + + private AdaptiveMetrics adaptiveMetrics; + + public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) { + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + + private ThreadPoolExecutor getExecutor(){ + if (null == executor) { + synchronized (this) { + if (null == executor) { + executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), + new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); + GlobalResourcesRepository.getInstance().registerDisposable(() -> this.executor.shutdown()); + } + } + } + return executor; + } + + @Override + public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { + return invoker.invoke(invocation); + } + + private String buildServiceKey(Invocation invocation){ + StringBuilder sb = new StringBuilder(128); + sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey()); + return sb.toString(); + } + + private String getServiceKey(Invocation invocation){ + + String key = (String) invocation.getAttributes().get(invocation.getInvoker()); + if (StringUtils.isNotEmpty(key)){ + return key; + } + + key = buildServiceKey(invocation); + invocation.getAttributes().put(invocation.getInvoker(),key); Review Comment: Invoker object is used as a map key in invocation attributes. On line 88, the invoker object itself is used as a key in invocation.getAttributes(). This is unusual and potentially problematic because invokers are heavyweight objects and using them as map keys may not provide the expected behavior if invoker instances are recreated. Consider using a string-based key or the invoker's URL as the key instead. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); Review Comment: Division by zero vulnerability exists when metrics.consumerReq.get() is 0. The expression (metrics.consumerSuccess.get() / (metrics.consumerReq.get() + 1)) will calculate success rate incorrectly. When consumerReq is 0, adding 1 makes it 1, so a success count of 0 would give 0/1 = 0. However, this should be handled more explicitly, and the calculation logic seems flawed: the success rate should be consumerSuccess / consumerReq (with proper zero check), not consumerSuccess / (consumerReq + 1). ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.resource.GlobalResourcesRepository; +import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; +import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; + +/** + * if the load balance is adaptive ,set attachment to get the metrics of the server + * @see org.apache.dubbo.rpc.Filter + * @see org.apache.dubbo.rpc.RpcContext + */ +@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) Review Comment: The @Activate value syntax appears incorrect. The value array should contain URL parameter keys (e.g., "actives", "cache"), not a colon-separated format like "loadbalance:adaptive". Based on the Dubbo framework conventions, this filter should likely check if the loadbalance parameter equals "adaptive". Consider using just "loadbalance" as the value or implementing custom activation logic in the invoke method. ```suggestion @Activate(group = CONSUMER, order = -200000, value = {LOADBALANCE_KEY}) ``` ########## dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalanceTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.loadbalance; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.AdaptiveMetrics; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest { + + private ApplicationModel scopeModel; + + private AdaptiveMetrics adaptiveMetrics; + + @Test + @Order(0) + void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker3 = 0; + int loop = 10000; + + ApplicationModel scopeModel = ApplicationModel.defaultModel(); + + AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokers, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + + if (selected.getUrl().getProtocol().equals("test3")) { + sumInvoker3++; + } + } + + // 1 : 9 : 6 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + System.out.println(sumInvoker3); + Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker3, loop, "select failed!"); + } + + private String buildServiceKey(Invoker invoker){ + URL url = invoker.getUrl(); + return url.getAddress() + ":" + invocation.getProtocolServiceKey(); + } + + private AdaptiveMetrics getAdaptiveMetricsInstance(){ + if (adaptiveMetrics == null) { + adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); + } + return adaptiveMetrics; + } + + @Test + @Order(1) + void testSelectByAdaptive() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker5 = 0; + int loop = 10000; + + scopeModel = ApplicationModel.defaultModel(); + AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel); + + lb.select(weightInvokersSR, null, weightTestInvocation); + + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation); + + Map<String, String> metricsMap = new HashMap<>(); + String idKey = buildServiceKey(selected); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + metricsMap.put("rt", "10"); + metricsMap.put("load", "10"); + metricsMap.put("curTime", String.valueOf(System.currentTimeMillis()-10)); + getAdaptiveMetricsInstance().addConsumerSuccess(idKey); + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + metricsMap.put("rt", "100"); + metricsMap.put("load", "40"); + metricsMap.put("curTime", String.valueOf(System.currentTimeMillis()-100)); + getAdaptiveMetricsInstance().addConsumerSuccess(idKey); + } + + if (selected.getUrl().getProtocol().equals("test5")) { + metricsMap.put("rt", "5000"); + metricsMap.put("load", "400");//400% + metricsMap.put("curTime", String.valueOf(System.currentTimeMillis() - 5000)); + + getAdaptiveMetricsInstance().addErrorReq(idKey); + sumInvoker5++; + } + getAdaptiveMetricsInstance().setProviderMetrics(idKey,metricsMap); + + } + Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = weightInvokersSR.stream() + .collect(Collectors.toMap(Function.identity(), e -> Integer.valueOf(e.getUrl().getParameter("weight")))); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. ########## dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * adaptive Metrics statistics. + */ +public class AdaptiveMetrics { + + private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>(); + + private long currentProviderTime = 0; + private double providerCPULoad = 0; + private long lastLatency = 0; + private long currentTime = 0; + + //Allow some time disorder + private long pickTime = System.currentTimeMillis(); + + private double beta = 0.5; + private final AtomicLong consumerReq = new AtomicLong(); + private final AtomicLong consumerSuccess = new AtomicLong(); + private final AtomicLong errorReq = new AtomicLong(); + private double ewma = 0; + + public double getLoad(String idKey,int weight,int timeout){ + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0){ + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + }else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double)metrics.consumerSuccess.get() / (double)(metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey){ + return metricsStatistics.computeIfAbsent(idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey,long time){ + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + + public void setProviderMetrics(String idKey,Map<String,String> metricsMap){ + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v,false)).orElse("0")); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
