Copilot commented on code in PR #10745:
URL: https://github.com/apache/dubbo/pull/10745#discussion_r2740274964


##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;

Review Comment:
   Potential bit shift issue with negative values. The expression 
metrics.lastLatency >> multiple performs an arithmetic right shift which 
preserves the sign bit. If lastLatency becomes negative (which shouldn't happen 
in normal operation but isn't explicitly prevented), this could produce 
unexpected results. Consider using unsigned right shift (>>>) or adding 
validation to ensure lastLatency remains non-negative.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);
+    }
+
+    public AdaptiveMetrics getStatus(String idKey){
+        return metricsStatistics.computeIfAbsent(idKey, k -> new 
AdaptiveMetrics());
+    }
+
+    public void addConsumerReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerReq.incrementAndGet();
+    }
+
+    public void addConsumerSuccess(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerSuccess.incrementAndGet();
+    }
+
+    public void addErrorReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.errorReq.incrementAndGet();
+    }
+
+    public void setPickTime(String idKey,long time){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.pickTime = time;
+    }
+
+
+
+    public void setProviderMetrics(String idKey,Map<String,String> metricsMap){
+
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        long serviceTime = 
Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0"));
+        //If server time is less than the current time, discard
+        if (metrics.currentProviderTime > serviceTime){
+            return;
+        }
+
+        metrics.currentProviderTime = serviceTime;
+        metrics.currentTime = serviceTime;
+        metrics.providerCPULoad = 
Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> 
StringUtils.isNumeric(v,true)).orElse("0"));
+        metrics.lastLatency = 
Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0")));
+
+        metrics.beta = 0.5;

Review Comment:
   Magic number without explanation. The value 0.5 for beta is hardcoded 
without documentation explaining why this specific value was chosen for the 
exponential weighted moving average calculation. Consider adding a comment 
explaining this is a standard EWMA smoothing factor or making it configurable 
if different scenarios require different values.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);
+    }
+
+    public AdaptiveMetrics getStatus(String idKey){
+        return metricsStatistics.computeIfAbsent(idKey, k -> new 
AdaptiveMetrics());
+    }
+
+    public void addConsumerReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerReq.incrementAndGet();
+    }
+
+    public void addConsumerSuccess(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerSuccess.incrementAndGet();
+    }
+
+    public void addErrorReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.errorReq.incrementAndGet();
+    }
+
+    public void setPickTime(String idKey,long time){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.pickTime = time;
+    }
+
+
+
+    public void setProviderMetrics(String idKey,Map<String,String> metricsMap){
+
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        long serviceTime = 
Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0"));
+        //If server time is less than the current time, discard
+        if (metrics.currentProviderTime > serviceTime){
+            return;
+        }
+
+        metrics.currentProviderTime = serviceTime;
+        metrics.currentTime = serviceTime;
+        metrics.providerCPULoad = 
Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> 
StringUtils.isNumeric(v,true)).orElse("0"));
+        metrics.lastLatency = 
Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0")));
+
+        metrics.beta = 0.5;
+        //Vt =  β * Vt-1 + (1 -  β ) * θt
+        metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * 
metrics.lastLatency;
+

Review Comment:
   Thread-safety issue: Multiple threads can access and modify metrics fields 
(currentProviderTime, providerCPULoad, lastLatency, currentTime, ewma) without 
synchronization. The ConcurrentHashMap ensures that different AdaptiveMetrics 
instances are handled correctly, but within a single instance, these fields are 
not thread-safe. Consider using synchronized methods or atomic variables for 
these fields.
   ```suggestion
   
           synchronized (metrics) {
               //If server time is less than the current time, discard
               if (metrics.currentProviderTime > serviceTime){
                   return;
               }
   
               metrics.currentProviderTime = serviceTime;
               metrics.currentTime = serviceTime;
               metrics.providerCPULoad = 
Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> 
StringUtils.isNumeric(v,true)).orElse("0"));
               metrics.lastLatency = 
Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0")));
   
               metrics.beta = 0.5;
               //Vt =  β * Vt-1 + (1 -  β ) * θt
               metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) 
* metrics.lastLatency;
           }
   ```



##########
dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalanceTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest {
+
+    private ApplicationModel scopeModel;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    @Test
+    @Order(0)
+    void testSelectByWeight() {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int sumInvoker3 = 0;
+        int loop = 10000;
+
+        ApplicationModel scopeModel = ApplicationModel.defaultModel();
+
+        AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokers, null, 
weightTestInvocation);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test3")) {
+                sumInvoker3++;
+            }
+        }
+
+        // 1 : 9 : 6
+        System.out.println(sumInvoker1);
+        System.out.println(sumInvoker2);
+        System.out.println(sumInvoker3);
+        Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker3, loop, 
"select failed!");
+    }
+
+    private String buildServiceKey(Invoker invoker){
+        URL url = invoker.getUrl();
+        return url.getAddress() + ":" + invocation.getProtocolServiceKey();
+    }
+
+    private AdaptiveMetrics getAdaptiveMetricsInstance(){
+        if (adaptiveMetrics == null) {
+            adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+        }
+        return adaptiveMetrics;
+    }
+
+    @Test
+    @Order(1)
+    void testSelectByAdaptive() {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int sumInvoker5 = 0;
+        int loop = 10000;
+
+        scopeModel = ApplicationModel.defaultModel();
+        AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);
+
+        lb.select(weightInvokersSR, null, weightTestInvocation);
+
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokersSR, null, 
weightTestInvocation);
+
+            Map<String, String> metricsMap = new HashMap<>();
+            String idKey = buildServiceKey(selected);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+                metricsMap.put("rt", "10");
+                metricsMap.put("load", "10");
+                metricsMap.put("curTime", 
String.valueOf(System.currentTimeMillis()-10));
+                getAdaptiveMetricsInstance().addConsumerSuccess(idKey);
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+                metricsMap.put("rt", "100");
+                metricsMap.put("load", "40");
+                metricsMap.put("curTime", 
String.valueOf(System.currentTimeMillis()-100));
+                getAdaptiveMetricsInstance().addConsumerSuccess(idKey);
+            }
+
+            if (selected.getUrl().getProtocol().equals("test5")) {
+                metricsMap.put("rt", "5000");
+                metricsMap.put("load", "400");//400%
+                metricsMap.put("curTime", 
String.valueOf(System.currentTimeMillis() - 5000));
+
+                getAdaptiveMetricsInstance().addErrorReq(idKey);
+                sumInvoker5++;
+            }
+            getAdaptiveMetricsInstance().setProviderMetrics(idKey,metricsMap);
+
+        }
+        Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = 
weightInvokersSR.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 
Integer.valueOf(e.getUrl().getParameter("weight"))));
+        Integer totalWeight = weightMap.values().stream().reduce(0, 
Integer::sum);
+        // max deviation = expectWeightValue * 2
+        int expectWeightValue = loop / totalWeight;
+        int maxDeviation = expectWeightValue * 2;
+        double beta = 0.5;
+        //this EMA is an approximate value
+        double ewma1 = beta * 50 + (1 - beta) * 10;
+        double ewma2 = beta * 50 + (1 - beta) * 100;
+        double ewma5 = beta * 50 + (1 - beta) * 5000;
+
+        AtomicInteger weight1 = new AtomicInteger();
+        AtomicInteger weight2 = new AtomicInteger();
+        AtomicInteger weight5 = new AtomicInteger();
+        weightMap.forEach((k, v) ->{
+            if (k.getUrl().getProtocol().equals("test1")){
+                weight1.set(v);
+            }
+            else if (k.getUrl().getProtocol().equals("test2")){
+                weight2.set(v);
+            }
+            else if (k.getUrl().getProtocol().equals("test5")){
+                weight5.set(v);
+            }
+        });
+
+        Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker5, loop, 
"select failed!");
+        Assertions.assertTrue(Math.abs(sumInvoker1 / 
(weightMap.get(weightInvoker1) * ewma1) - expectWeightValue) < maxDeviation, 
"select failed!");
+        Assertions.assertTrue(Math.abs(sumInvoker2 / 
(weightMap.get(weightInvoker2) * ewma2) - expectWeightValue) < maxDeviation, 
"select failed!");
+        Assertions.assertTrue(Math.abs(sumInvoker5 / 
(weightMap.get(weightInvoker5) * ewma5) - expectWeightValue) < maxDeviation, 
"select failed!");

Review Comment:
   The test assertion logic is incorrect. Lines 163-165 attempt to calculate 
expected values based on weight and ewma, but the division operations don't 
match the load calculation formula used in the actual code. In 
AdaptiveMetrics.getLoad(), the formula divides by ((successRate * weight) + 1), 
not by (weight * ewma). The test's assertion logic should match the actual 
implementation's load calculation formula for accurate verification.
   ```suggestion
           // successRate is 1.0 for test1 and test2 (all successes), and 0.0 
for test5 (all errors) in this test.
           Assertions.assertTrue(Math.abs(sumInvoker1 * ewma1 / ((1.0 * 
weight1.get()) + 1) - expectWeightValue) < maxDeviation, "select failed!");
           Assertions.assertTrue(Math.abs(sumInvoker2 * ewma2 / ((1.0 * 
weight2.get()) + 1) - expectWeightValue) < maxDeviation, "select failed!");
           Assertions.assertTrue(Math.abs(sumInvoker5 * ewma5 / ((0.0 * 
weight5.get()) + 1) - expectWeightValue) < maxDeviation, "select failed!");
   ```



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.resource.GlobalResourcesRepository;
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * if the load balance is adaptive ,set attachment to get the metrics of the 
server
+ * @see org.apache.dubbo.rpc.Filter
+ * @see org.apache.dubbo.rpc.RpcContext
+ */
+@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"})
+public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener {
+
+    /**
+     * uses a single worker thread operating off an bounded queue
+     */
+    private volatile ThreadPoolExecutor executor = null;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) {
+        adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+    }
+
+    private ThreadPoolExecutor getExecutor(){
+        if (null == executor) {
+            synchronized (this) {
+                if (null == executor) {
+                    executor = new ThreadPoolExecutor(1, 1, 
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
+                        new 
NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new 
ThreadPoolExecutor.DiscardOldestPolicy());
+                    
GlobalResourcesRepository.getInstance().registerDisposable(() -> 
this.executor.shutdown());
+                }
+            }
+        }
+        return executor;
+    }
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws 
RpcException {
+        return invoker.invoke(invocation);
+    }
+
+    private String buildServiceKey(Invocation invocation){
+        StringBuilder sb = new StringBuilder(128);
+        
sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey());
+        return sb.toString();
+    }
+
+    private String getServiceKey(Invocation invocation){
+
+        String key = (String) 
invocation.getAttributes().get(invocation.getInvoker());
+        if (StringUtils.isNotEmpty(key)){
+            return key;
+        }
+
+        key = buildServiceKey(invocation);
+        invocation.getAttributes().put(invocation.getInvoker(),key);
+        return key;
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation 
invocation) {
+
+        try {
+            String loadBalance = (String) 
invocation.getAttributes().get(LOADBALANCE_KEY);
+            if (StringUtils.isEmpty(loadBalance)
+                || !LoadbalanceRules.ADAPTIVE.equals(loadBalance)) {
+                return;
+            }
+            adaptiveMetrics.addConsumerSuccess(getServiceKey(invocation));
+            String attachment = 
appResponse.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+            if (StringUtils.isNotEmpty(attachment)) {
+                String[] parties = COMMA_SPLIT_PATTERN.split(attachment);
+                if (parties.length == 0) {
+                    return;
+                }
+                Map<String, String> metricsMap = new HashMap<>();
+                for (String party : parties) {
+                    String[] groups = party.split(":");
+                    if (groups.length != 2) {
+                        continue;
+                    }
+                    metricsMap.put(groups[0], groups[1]);
+                }
+
+                Long startTime = (Long) 
invocation.getAttributes().get(Constants.ADAPTIVE_LOADBALANCE_START_TIME);
+                if (null != startTime) {
+                    metricsMap.put("rt", 
String.valueOf(System.currentTimeMillis() - startTime));
+                }
+
+                getExecutor().execute(() -> {
+                    
adaptiveMetrics.setProviderMetrics(getServiceKey(invocation), metricsMap);
+                });
+            }
+        }
+        finally {
+            
appResponse.getAttachments().remove(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+        }
+
+    }
+
+    @Override
+    public void onError(Throwable t, Invoker<?> invoker, Invocation 
invocation) {
+        String loadBalance = (String) 
invocation.getAttributes().get(LOADBALANCE_KEY);
+        if (StringUtils.isNotEmpty(loadBalance)
+            && LoadbalanceRules.ADAPTIVE.equals(loadBalance)) {
+            getExecutor().execute(() -> {
+                adaptiveMetrics.addErrorReq(getServiceKey(invocation));
+            });
+        }
+    }
+
+
+}

Review Comment:
   Missing test coverage for AdaptiveLoadBalanceFilter. The repository has 
comprehensive test coverage for other filters in the same package 
(ActiveLimitFilterTest, ContextFilterTest, etc.), but there's no test file for 
AdaptiveLoadBalanceFilter. Unit tests should be added to verify the filter's 
behavior including attachment handling, metrics collection, and error scenarios.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }

Review Comment:
   Redundant condition check. The condition 'multiple > 0' on line 58 is always 
true because multiple is calculated as (System.currentTimeMillis() - 
metrics.currentTime) / timeout + 1. Since currentTime is less than 
currentTimeMillis (we're in the if block where currentTime > 0), and timeout is 
positive, multiple will always be at least 1. This check is unnecessary and can 
be removed.
   ```suggestion
               if (metrics.currentProviderTime == metrics.currentTime) {
                   //penalty value
                   metrics.lastLatency = timeout * 2L;
               }else {
                   metrics.lastLatency = metrics.lastLatency >> multiple;
               }
               metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) 
* metrics.lastLatency;
               metrics.currentTime = System.currentTimeMillis();
   ```



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ProfilerServerFilter.java:
##########
@@ -78,6 +85,18 @@ private void afterInvoke(Invoker<?> invoker, Invocation 
invocation) {
             }
         }
     }
+    private void addAdaptiveResponse(Result appResponse, Invocation 
invocation) {
+        String adaptiveLoadAttachment = 
invocation.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+        if (StringUtils.isNotEmpty(adaptiveLoadAttachment)) {
+            OperatingSystemMXBean operatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean();
+
+            StringBuilder sb = new StringBuilder(64);
+            sb.append("curTime:").append(System.currentTimeMillis());
+            
sb.append(COMMA_SEPARATOR).append("load:").append(operatingSystemMXBean.getSystemLoadAverage()
 * 100 / operatingSystemMXBean.getAvailableProcessors() );
+

Review Comment:
   No error handling for OperatingSystemMXBean operations. 
getSystemLoadAverage() can return -1 on systems where load average is not 
available (e.g., Windows). This would result in a negative load value being 
sent in the attachment. Consider adding a check: if (loadAvg < 0) to handle 
this case appropriately.
   ```suggestion
   
               double loadAvg = operatingSystemMXBean.getSystemLoadAverage();
               int availableProcessors = 
operatingSystemMXBean.getAvailableProcessors();
               long loadValue;
               if (loadAvg >= 0 && availableProcessors > 0) {
                   loadValue = Math.round(loadAvg * 100 / availableProcessors);
               } else {
                   // Fallback when system load average is not available or 
processor count is invalid
                   loadValue = 0L;
               }
   
               sb.append(COMMA_SEPARATOR).append("load:").append(loadValue);
   ```



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java:
##########
@@ -104,9 +104,12 @@ public interface Constants {
     String H2_SETTINGS_MAX_FRAME_SIZE_KEY = "dubbo.rpc.tri.max-frame-size";
     String H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY = 
"dubbo.rpc.tri.max-header-list-size";
 
+    String ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY = "lb_adaptive";
+    String ADAPTIVE_LOADBALANCE_START_TIME = "adaptive_startTime";

Review Comment:
   Constants are inserted in the middle of H2-related settings group, breaking 
logical grouping. The adaptive load balance constants (lines 107-108) are 
placed between H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY and 
H2_SUPPORT_NO_LOWER_HEADER_KEY. Consider moving these constants to a separate 
section or grouping them with other load balance related constants for better 
code organization.



##########
dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalance.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.support.RpcUtils;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * AdaptiveLoadBalance
+ * </p>
+ */
+public class AdaptiveLoadBalance extends AbstractLoadBalance {
+
+    public static final String NAME = "adaptive";
+
+    //default key
+    private String attachmentKey = "mem,load";
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    public AdaptiveLoadBalance(ApplicationModel scopeModel){
+        adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+    }
+
+    @Override
+    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, 
Invocation invocation) {
+        Invoker<T> invoker = selectByP2C(invokers,invocation);
+        
invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey);
+        long startTime = System.currentTimeMillis();
+        
invocation.getAttributes().put(Constants.ADAPTIVE_LOADBALANCE_START_TIME,startTime);
+        
invocation.getAttributes().put(LOADBALANCE_KEY,LoadbalanceRules.ADAPTIVE);
+        adaptiveMetrics.addConsumerReq(getServiceKey(invoker,invocation));
+        
adaptiveMetrics.setPickTime(getServiceKey(invoker,invocation),startTime);
+
+        return invoker;
+    }
+
+    private <T> Invoker<T> selectByP2C(List<Invoker<T>> invokers, Invocation 
invocation){
+        int length = invokers.size();
+        if(length == 1) {
+            return invokers.get(0);
+        }
+
+        if(length == 2) {
+            return 
chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation);
+        }
+
+        int pos1 = ThreadLocalRandom.current().nextInt(length);
+        int pos2 = ThreadLocalRandom.current().nextInt(length - 1);
+        if (pos2 >= pos1) {
+            pos2 = pos2 + 1;
+        }
+
+        return 
chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation);
+    }
+
+    private String getServiceKey(Invoker<?> invoker,Invocation invocation){
+
+        String key = (String) invocation.getAttributes().get(invoker);
+        if (StringUtils.isNotEmpty(key)){
+            return key;
+        }
+
+        key = buildServiceKey(invoker,invocation);
+        invocation.getAttributes().put(invoker,key);
+        return key;
+    }
+
+    private String buildServiceKey(Invoker<?> invoker,Invocation invocation){
+        URL url = invoker.getUrl();
+        StringBuilder sb = new StringBuilder(128);
+        
sb.append(url.getAddress()).append(":").append(invocation.getProtocolServiceKey());
+        return sb.toString();
+    }
+
+    private int getTimeout(Invoker<?> invoker, Invocation invocation) {
+        URL url = invoker.getUrl();
+        String methodName = RpcUtils.getMethodName(invocation);
+        return (int) RpcUtils.getTimeout(url,methodName, 
RpcContext.getClientAttachment(),invocation, DEFAULT_TIMEOUT);
+    }
+
+    private <T> Invoker<T> chooseLowLoadInvoker(Invoker<T> invoker1,Invoker<T> 
invoker2,Invocation invocation){
+        int weight1 = getWeight(invoker1, invocation);
+        int weight2 = getWeight(invoker2, invocation);
+        int timeout1 = getTimeout(invoker2, invocation);

Review Comment:
   Both timeout1 and timeout2 are incorrectly calling getTimeout with invoker2. 
Line 113 should use invoker1 instead of invoker2 to get the correct timeout 
value for the first invoker.
   ```suggestion
           int timeout1 = getTimeout(invoker1, invocation);
   ```



##########
dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalance.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.support.RpcUtils;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * AdaptiveLoadBalance
+ * </p>
+ */
+public class AdaptiveLoadBalance extends AbstractLoadBalance {
+
+    public static final String NAME = "adaptive";
+
+    //default key
+    private String attachmentKey = "mem,load";

Review Comment:
   Unused field 'attachmentKey' is set to a default value but never modified or 
read in any meaningful way. The field is set in line 45 and used in line 56, 
but the value is never configured from outside. If this is intended to be 
configurable, it should have a setter or be initialized from configuration. If 
it's not meant to be configurable, consider making it a constant or removing it 
if it's truly unused.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);
+    }
+
+    public AdaptiveMetrics getStatus(String idKey){
+        return metricsStatistics.computeIfAbsent(idKey, k -> new 
AdaptiveMetrics());
+    }
+
+    public void addConsumerReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerReq.incrementAndGet();
+    }
+
+    public void addConsumerSuccess(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerSuccess.incrementAndGet();
+    }
+
+    public void addErrorReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.errorReq.incrementAndGet();
+    }
+
+    public void setPickTime(String idKey,long time){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.pickTime = time;
+    }
+
+
+
+    public void setProviderMetrics(String idKey,Map<String,String> metricsMap){
+
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        long serviceTime = 
Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0"));
+        //If server time is less than the current time, discard
+        if (metrics.currentProviderTime > serviceTime){
+            return;
+        }
+
+        metrics.currentProviderTime = serviceTime;
+        metrics.currentTime = serviceTime;
+        metrics.providerCPULoad = 
Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> 
StringUtils.isNumeric(v,true)).orElse("0"));
+        metrics.lastLatency = 
Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0")));
+
+        metrics.beta = 0.5;
+        //Vt =  β * Vt-1 + (1 -  β ) * θt
+        metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * 
metrics.lastLatency;
+
+    }
+}
+

Review Comment:
   Missing test coverage for AdaptiveMetrics. No unit tests are provided for 
the AdaptiveMetrics class which contains complex calculation logic including 
EWMA calculations, load scoring, and concurrent data structures. Tests should 
verify the correctness of getLoad calculations, thread safety of concurrent 
operations, and edge cases like timeout handling and metrics decay.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);
+    }
+
+    public AdaptiveMetrics getStatus(String idKey){
+        return metricsStatistics.computeIfAbsent(idKey, k -> new 
AdaptiveMetrics());
+    }
+
+    public void addConsumerReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerReq.incrementAndGet();
+    }
+
+    public void addConsumerSuccess(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerSuccess.incrementAndGet();
+    }
+
+    public void addErrorReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.errorReq.incrementAndGet();
+    }
+
+    public void setPickTime(String idKey,long time){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.pickTime = time;
+    }
+
+
+
+    public void setProviderMetrics(String idKey,Map<String,String> metricsMap){
+
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        long serviceTime = 
Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0"));
+        //If server time is less than the current time, discard
+        if (metrics.currentProviderTime > serviceTime){
+            return;
+        }
+
+        metrics.currentProviderTime = serviceTime;
+        metrics.currentTime = serviceTime;
+        metrics.providerCPULoad = 
Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> 
StringUtils.isNumeric(v,true)).orElse("0"));

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);
+    }
+
+    public AdaptiveMetrics getStatus(String idKey){
+        return metricsStatistics.computeIfAbsent(idKey, k -> new 
AdaptiveMetrics());
+    }
+
+    public void addConsumerReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerReq.incrementAndGet();
+    }
+
+    public void addConsumerSuccess(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerSuccess.incrementAndGet();
+    }
+
+    public void addErrorReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.errorReq.incrementAndGet();
+    }
+
+    public void setPickTime(String idKey,long time){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.pickTime = time;
+    }
+
+
+
+    public void setProviderMetrics(String idKey,Map<String,String> metricsMap){
+
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        long serviceTime = 
Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0"));
+        //If server time is less than the current time, discard
+        if (metrics.currentProviderTime > serviceTime){
+            return;
+        }
+
+        metrics.currentProviderTime = serviceTime;
+        metrics.currentTime = serviceTime;
+        metrics.providerCPULoad = 
Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> 
StringUtils.isNumeric(v,true)).orElse("0"));
+        metrics.lastLatency = 
Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0")));

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ProfilerServerFilter.java:
##########
@@ -78,6 +85,18 @@ private void afterInvoke(Invoker<?> invoker, Invocation 
invocation) {
             }
         }
     }
+    private void addAdaptiveResponse(Result appResponse, Invocation 
invocation) {
+        String adaptiveLoadAttachment = 
invocation.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+        if (StringUtils.isNotEmpty(adaptiveLoadAttachment)) {
+            OperatingSystemMXBean operatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean();
+
+            StringBuilder sb = new StringBuilder(64);

Review Comment:
   Inefficient string building with fixed capacity. The StringBuilder is 
initialized with a capacity of 64 characters, but this may be excessive or 
insufficient depending on the actual data. The constructed string 
"curTime:1234567890123,load:45.67" is relatively short and predictable in 
length. Consider adjusting the initial capacity to match the expected size more 
accurately (approximately 30-40 characters) to reduce memory waste.
   ```suggestion
               StringBuilder sb = new StringBuilder(40);
   ```



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.resource.GlobalResourcesRepository;
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * if the load balance is adaptive ,set attachment to get the metrics of the 
server
+ * @see org.apache.dubbo.rpc.Filter
+ * @see org.apache.dubbo.rpc.RpcContext
+ */
+@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"})
+public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener {
+
+    /**
+     * uses a single worker thread operating off an bounded queue
+     */
+    private volatile ThreadPoolExecutor executor = null;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) {
+        adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+    }
+
+    private ThreadPoolExecutor getExecutor(){
+        if (null == executor) {
+            synchronized (this) {
+                if (null == executor) {
+                    executor = new ThreadPoolExecutor(1, 1, 
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
+                        new 
NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new 
ThreadPoolExecutor.DiscardOldestPolicy());
+                    
GlobalResourcesRepository.getInstance().registerDisposable(() -> 
this.executor.shutdown());
+                }
+            }
+        }
+        return executor;
+    }
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws 
RpcException {
+        return invoker.invoke(invocation);
+    }
+
+    private String buildServiceKey(Invocation invocation){
+        StringBuilder sb = new StringBuilder(128);
+        
sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey());
+        return sb.toString();
+    }
+
+    private String getServiceKey(Invocation invocation){
+
+        String key = (String) 
invocation.getAttributes().get(invocation.getInvoker());
+        if (StringUtils.isNotEmpty(key)){
+            return key;
+        }
+
+        key = buildServiceKey(invocation);
+        invocation.getAttributes().put(invocation.getInvoker(),key);
+        return key;
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation 
invocation) {
+
+        try {
+            String loadBalance = (String) 
invocation.getAttributes().get(LOADBALANCE_KEY);
+            if (StringUtils.isEmpty(loadBalance)
+                || !LoadbalanceRules.ADAPTIVE.equals(loadBalance)) {
+                return;
+            }
+            adaptiveMetrics.addConsumerSuccess(getServiceKey(invocation));
+            String attachment = 
appResponse.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+            if (StringUtils.isNotEmpty(attachment)) {
+                String[] parties = COMMA_SPLIT_PATTERN.split(attachment);
+                if (parties.length == 0) {
+                    return;
+                }
+                Map<String, String> metricsMap = new HashMap<>();
+                for (String party : parties) {
+                    String[] groups = party.split(":");
+                    if (groups.length != 2) {
+                        continue;
+                    }
+                    metricsMap.put(groups[0], groups[1]);
+                }
+
+                Long startTime = (Long) 
invocation.getAttributes().get(Constants.ADAPTIVE_LOADBALANCE_START_TIME);
+                if (null != startTime) {
+                    metricsMap.put("rt", 
String.valueOf(System.currentTimeMillis() - startTime));
+                }
+
+                getExecutor().execute(() -> {
+                    
adaptiveMetrics.setProviderMetrics(getServiceKey(invocation), metricsMap);
+                });
+            }
+        }
+        finally {
+            
appResponse.getAttachments().remove(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+        }

Review Comment:
   Attachment cleanup occurs in finally block but should be conditional. The 
code removes the ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY from attachments 
regardless of whether the filter actually added it. If the filter logic in 
onResponse didn't execute due to early return (lines 103-106), removing the 
attachment could be incorrect. Consider moving the removal inside the try block 
after successful processing.



##########
dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalance.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.support.RpcUtils;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * AdaptiveLoadBalance
+ * </p>
+ */
+public class AdaptiveLoadBalance extends AbstractLoadBalance {
+
+    public static final String NAME = "adaptive";
+
+    //default key
+    private String attachmentKey = "mem,load";
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    public AdaptiveLoadBalance(ApplicationModel scopeModel){
+        adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+    }
+
+    @Override
+    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, 
Invocation invocation) {
+        Invoker<T> invoker = selectByP2C(invokers,invocation);
+        
invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey);
+        long startTime = System.currentTimeMillis();
+        
invocation.getAttributes().put(Constants.ADAPTIVE_LOADBALANCE_START_TIME,startTime);
+        
invocation.getAttributes().put(LOADBALANCE_KEY,LoadbalanceRules.ADAPTIVE);
+        adaptiveMetrics.addConsumerReq(getServiceKey(invoker,invocation));
+        
adaptiveMetrics.setPickTime(getServiceKey(invoker,invocation),startTime);
+
+        return invoker;
+    }
+
+    private <T> Invoker<T> selectByP2C(List<Invoker<T>> invokers, Invocation 
invocation){
+        int length = invokers.size();
+        if(length == 1) {
+            return invokers.get(0);
+        }
+
+        if(length == 2) {
+            return 
chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation);
+        }
+
+        int pos1 = ThreadLocalRandom.current().nextInt(length);
+        int pos2 = ThreadLocalRandom.current().nextInt(length - 1);
+        if (pos2 >= pos1) {
+            pos2 = pos2 + 1;
+        }
+
+        return 
chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation);
+    }
+
+    private String getServiceKey(Invoker<?> invoker,Invocation invocation){
+
+        String key = (String) invocation.getAttributes().get(invoker);
+        if (StringUtils.isNotEmpty(key)){
+            return key;
+        }
+
+        key = buildServiceKey(invoker,invocation);
+        invocation.getAttributes().put(invoker,key);
+        return key;

Review Comment:
   Invoker object is used as a map key in invocation attributes. On line 87, 
the invoker object itself is used as a key in invocation.getAttributes(). This 
is unusual and potentially problematic because invokers are heavyweight objects 
and using them as map keys may not provide the expected behavior if invoker 
instances are recreated. Consider using a string-based key or the invoker's URL 
as the key instead.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.resource.GlobalResourcesRepository;
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * if the load balance is adaptive ,set attachment to get the metrics of the 
server
+ * @see org.apache.dubbo.rpc.Filter
+ * @see org.apache.dubbo.rpc.RpcContext
+ */
+@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"})
+public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener {
+
+    /**
+     * uses a single worker thread operating off an bounded queue
+     */
+    private volatile ThreadPoolExecutor executor = null;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) {
+        adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+    }
+
+    private ThreadPoolExecutor getExecutor(){
+        if (null == executor) {
+            synchronized (this) {
+                if (null == executor) {
+                    executor = new ThreadPoolExecutor(1, 1, 
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
+                        new 
NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new 
ThreadPoolExecutor.DiscardOldestPolicy());
+                    
GlobalResourcesRepository.getInstance().registerDisposable(() -> 
this.executor.shutdown());
+                }
+            }
+        }
+        return executor;
+    }

Review Comment:
   ThreadPoolExecutor initialization and shutdown have potential resource leak. 
The executor is registered for disposal via 
GlobalResourcesRepository.getInstance().registerDisposable(), but if the filter 
is instantiated multiple times (e.g., in different application contexts), each 
instance will register its own shutdown hook. Additionally, the double-checked 
locking pattern used here is correct, but consider whether multiple filter 
instances sharing state through AdaptiveMetrics is the intended design.



##########
dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalanceTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest {
+
+    private ApplicationModel scopeModel;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    @Test
+    @Order(0)
+    void testSelectByWeight() {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int sumInvoker3 = 0;
+        int loop = 10000;
+
+        ApplicationModel scopeModel = ApplicationModel.defaultModel();
+
+        AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokers, null, 
weightTestInvocation);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test3")) {
+                sumInvoker3++;
+            }
+        }
+
+        // 1 : 9 : 6
+        System.out.println(sumInvoker1);
+        System.out.println(sumInvoker2);
+        System.out.println(sumInvoker3);

Review Comment:
   Test uses System.out.println for output instead of proper logging or 
assertions. Lines 71-73 print sumInvoker values but don't verify them against 
expected values. Consider either removing these print statements or converting 
them to proper assertions that validate the distribution is within acceptable 
bounds.
   ```suggestion
   
   ```



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ProfilerServerFilter.java:
##########
@@ -78,6 +85,18 @@ private void afterInvoke(Invoker<?> invoker, Invocation 
invocation) {
             }
         }
     }
+    private void addAdaptiveResponse(Result appResponse, Invocation 
invocation) {
+        String adaptiveLoadAttachment = 
invocation.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY);
+        if (StringUtils.isNotEmpty(adaptiveLoadAttachment)) {
+            OperatingSystemMXBean operatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean();
+
+            StringBuilder sb = new StringBuilder(64);
+            sb.append("curTime:").append(System.currentTimeMillis());
+            
sb.append(COMMA_SEPARATOR).append("load:").append(operatingSystemMXBean.getSystemLoadAverage()
 * 100 / operatingSystemMXBean.getAvailableProcessors() );

Review Comment:
   Potential integer overflow and incorrect percentage calculation. The 
expression operatingSystemMXBean.getSystemLoadAverage() * 100 / 
operatingSystemMXBean.getAvailableProcessors() can produce misleading results. 
getSystemLoadAverage() returns a value typically between 0 and the number of 
processors (e.g., 0-4 for a 4-core system). Multiplying by 100 and dividing by 
processor count doesn't yield a proper percentage. For example, on a 4-core 
system with load of 2.0, this calculates (2.0 * 100) / 4 = 50, but the actual 
CPU load percentage should be (2.0 / 4) * 100 = 50. The order of operations 
matters for precision and semantics.
   ```suggestion
               sb.append(COMMA_SEPARATOR).append("load:").append(
                       (operatingSystemMXBean.getSystemLoadAverage() / 
operatingSystemMXBean.getAvailableProcessors()) * 100
               );
   ```



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);

Review Comment:
   Complex load calculation formula lacks documentation and explanation. The 
formula on line 71 combines CPU load, EWMA, inflight requests, and success rate 
in a specific way, but there's no explanation of why this particular formula 
was chosen or how each component contributes to the final load score. Adding 
detailed comments explaining the rationale and expected behavior would improve 
maintainability. For example, explain why sqrt(ewma) is used instead of ewma 
directly, and what the +1 terms prevent.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.resource.GlobalResourcesRepository;
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * if the load balance is adaptive ,set attachment to get the metrics of the 
server
+ * @see org.apache.dubbo.rpc.Filter
+ * @see org.apache.dubbo.rpc.RpcContext
+ */
+@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"})
+public class AdaptiveLoadBalanceFilter implements Filter, Filter.Listener {
+
+    /**
+     * uses a single worker thread operating off an bounded queue
+     */
+    private volatile ThreadPoolExecutor executor = null;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    public AdaptiveLoadBalanceFilter(ApplicationModel scopeModel) {
+        adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+    }
+
+    private ThreadPoolExecutor getExecutor(){
+        if (null == executor) {
+            synchronized (this) {
+                if (null == executor) {
+                    executor = new ThreadPoolExecutor(1, 1, 
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
+                        new 
NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new 
ThreadPoolExecutor.DiscardOldestPolicy());
+                    
GlobalResourcesRepository.getInstance().registerDisposable(() -> 
this.executor.shutdown());
+                }
+            }
+        }
+        return executor;
+    }
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws 
RpcException {
+        return invoker.invoke(invocation);
+    }
+
+    private String buildServiceKey(Invocation invocation){
+        StringBuilder sb = new StringBuilder(128);
+        
sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey());
+        return sb.toString();
+    }
+
+    private String getServiceKey(Invocation invocation){
+
+        String key = (String) 
invocation.getAttributes().get(invocation.getInvoker());
+        if (StringUtils.isNotEmpty(key)){
+            return key;
+        }
+
+        key = buildServiceKey(invocation);
+        invocation.getAttributes().put(invocation.getInvoker(),key);

Review Comment:
   Invoker object is used as a map key in invocation attributes. On line 88, 
the invoker object itself is used as a key in invocation.getAttributes(). This 
is unusual and potentially problematic because invokers are heavyweight objects 
and using them as map keys may not provide the expected behavior if invoker 
instances are recreated. Consider using a string-based key or the invoker's URL 
as the key instead.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);

Review Comment:
   Division by zero vulnerability exists when metrics.consumerReq.get() is 0. 
The expression (metrics.consumerSuccess.get() / (metrics.consumerReq.get() + 
1)) will calculate success rate incorrectly. When consumerReq is 0, adding 1 
makes it 1, so a success count of 0 would give 0/1 = 0. However, this should be 
handled more explicitly, and the calculation logic seems flawed: the success 
rate should be consumerSuccess / consumerReq (with proper zero check), not 
consumerSuccess / (consumerReq + 1).



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AdaptiveLoadBalanceFilter.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.constants.LoadbalanceRules;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.resource.GlobalResourcesRepository;
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
+import static 
org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+
+/**
+ * if the load balance is adaptive ,set attachment to get the metrics of the 
server
+ * @see org.apache.dubbo.rpc.Filter
+ * @see org.apache.dubbo.rpc.RpcContext
+ */
+@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"})

Review Comment:
   The @Activate value syntax appears incorrect. The value array should contain 
URL parameter keys (e.g., "actives", "cache"), not a colon-separated format 
like "loadbalance:adaptive". Based on the Dubbo framework conventions, this 
filter should likely check if the loadbalance parameter equals "adaptive". 
Consider using just "loadbalance" as the value or implementing custom 
activation logic in the invoke method.
   ```suggestion
   @Activate(group = CONSUMER, order = -200000, value = {LOADBALANCE_KEY})
   ```



##########
dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AdaptiveLoadBalanceTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.AdaptiveMetrics;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest {
+
+    private ApplicationModel scopeModel;
+
+    private AdaptiveMetrics adaptiveMetrics;
+
+    @Test
+    @Order(0)
+    void testSelectByWeight() {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int sumInvoker3 = 0;
+        int loop = 10000;
+
+        ApplicationModel scopeModel = ApplicationModel.defaultModel();
+
+        AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokers, null, 
weightTestInvocation);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test3")) {
+                sumInvoker3++;
+            }
+        }
+
+        // 1 : 9 : 6
+        System.out.println(sumInvoker1);
+        System.out.println(sumInvoker2);
+        System.out.println(sumInvoker3);
+        Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker3, loop, 
"select failed!");
+    }
+
+    private String buildServiceKey(Invoker invoker){
+        URL url = invoker.getUrl();
+        return url.getAddress() + ":" + invocation.getProtocolServiceKey();
+    }
+
+    private AdaptiveMetrics getAdaptiveMetricsInstance(){
+        if (adaptiveMetrics == null) {
+            adaptiveMetrics = 
scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
+        }
+        return adaptiveMetrics;
+    }
+
+    @Test
+    @Order(1)
+    void testSelectByAdaptive() {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int sumInvoker5 = 0;
+        int loop = 10000;
+
+        scopeModel = ApplicationModel.defaultModel();
+        AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);
+
+        lb.select(weightInvokersSR, null, weightTestInvocation);
+
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokersSR, null, 
weightTestInvocation);
+
+            Map<String, String> metricsMap = new HashMap<>();
+            String idKey = buildServiceKey(selected);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+                metricsMap.put("rt", "10");
+                metricsMap.put("load", "10");
+                metricsMap.put("curTime", 
String.valueOf(System.currentTimeMillis()-10));
+                getAdaptiveMetricsInstance().addConsumerSuccess(idKey);
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+                metricsMap.put("rt", "100");
+                metricsMap.put("load", "40");
+                metricsMap.put("curTime", 
String.valueOf(System.currentTimeMillis()-100));
+                getAdaptiveMetricsInstance().addConsumerSuccess(idKey);
+            }
+
+            if (selected.getUrl().getProtocol().equals("test5")) {
+                metricsMap.put("rt", "5000");
+                metricsMap.put("load", "400");//400%
+                metricsMap.put("curTime", 
String.valueOf(System.currentTimeMillis() - 5000));
+
+                getAdaptiveMetricsInstance().addErrorReq(idKey);
+                sumInvoker5++;
+            }
+            getAdaptiveMetricsInstance().setProviderMetrics(idKey,metricsMap);
+
+        }
+        Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = 
weightInvokersSR.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 
Integer.valueOf(e.getUrl().getParameter("weight"))));

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AdaptiveMetrics.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * adaptive Metrics statistics.
+ */
+public class AdaptiveMetrics {
+
+    private ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new 
ConcurrentHashMap<>();
+
+    private long currentProviderTime = 0;
+    private double providerCPULoad = 0;
+    private long lastLatency = 0;
+    private long currentTime = 0;
+
+    //Allow some time disorder
+    private long pickTime = System.currentTimeMillis();
+
+    private double beta = 0.5;
+    private final AtomicLong consumerReq = new AtomicLong();
+    private final AtomicLong consumerSuccess = new AtomicLong();
+    private final AtomicLong errorReq = new AtomicLong();
+    private double ewma = 0;
+
+    public double getLoad(String idKey,int weight,int timeout){
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        //If the time more than 2 times, mandatory selected
+        if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
+            return 0;
+        }
+
+        if (metrics.currentTime > 0){
+            long multiple = (System.currentTimeMillis() - metrics.currentTime) 
/ timeout + 1;
+            if (multiple > 0) {
+                if (metrics.currentProviderTime == metrics.currentTime) {
+                    //penalty value
+                    metrics.lastLatency = timeout * 2L;
+                }else {
+                    metrics.lastLatency = metrics.lastLatency >> multiple;
+                }
+                metrics.ewma = metrics.beta * metrics.ewma + (1 - 
metrics.beta) * metrics.lastLatency;
+                metrics.currentTime = System.currentTimeMillis();
+            }
+        }
+
+        long inflight = metrics.consumerReq.get() - 
metrics.consumerSuccess.get() - metrics.errorReq.get();
+        return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * 
(inflight + 1) / ((((double)metrics.consumerSuccess.get() / 
(double)(metrics.consumerReq.get() + 1)) * weight) + 1);
+    }
+
+    public AdaptiveMetrics getStatus(String idKey){
+        return metricsStatistics.computeIfAbsent(idKey, k -> new 
AdaptiveMetrics());
+    }
+
+    public void addConsumerReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerReq.incrementAndGet();
+    }
+
+    public void addConsumerSuccess(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.consumerSuccess.incrementAndGet();
+    }
+
+    public void addErrorReq(String idKey){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.errorReq.incrementAndGet();
+    }
+
+    public void setPickTime(String idKey,long time){
+        AdaptiveMetrics metrics = getStatus(idKey);
+        metrics.pickTime = time;
+    }
+
+
+
+    public void setProviderMetrics(String idKey,Map<String,String> metricsMap){
+
+        AdaptiveMetrics metrics = getStatus(idKey);
+
+        long serviceTime = 
Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> 
StringUtils.isNumeric(v,false)).orElse("0"));

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to