This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0-adaptive-stream
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0-adaptive-stream by this 
push:
     new 4ed7c71cd feat: adaptive load balancer of adaptive service (#2067)
4ed7c71cd is described below

commit 4ed7c71cd850b07ebb0fa93e695f981d299a4bc3
Author: Zhang Yepeng <[email protected]>
AuthorDate: Sat Dec 10 13:43:40 2022 +0800

    feat: adaptive load balancer of adaptive service (#2067)
    
    * feat(adaptive service): add success rate and rtt to p2c's weight
    
    * feat(adaptive service): p2c weight
    
    * feat(adaptive service): ema
    
    * fix license
    
    * fix time
    
    * fix error tests
    
    * fix p2c tests
    
    * remove useless division calculations
    
    * move ema and slidingWindowCounter to RollingMetrics
    
    * make reading from ema and slidingWindowCounter thread-safe
    
    * add GetMethodMetricsKey method
    
    * validate rolling metrics implementation
    
    * add license
    
    * fix p2c tests
    
    * add license
    
    * rename sliding_window_counter_metrics
    
    * remove useless factor
---
 cluster/cluster/adaptivesvc/cluster_invoker.go     |  52 +++----
 cluster/loadbalance/p2c/loadbalance.go             |  87 ++++++------
 cluster/loadbalance/p2c/loadbalance_test.go        | 155 ++++++++++++++++-----
 cluster/metrics/constants.go                       |   3 +
 cluster/metrics/local_metrics.go                   |  20 ++-
 cluster/metrics/metrics.go                         |   8 --
 cluster/metrics/{utils.go => rolling/ema.go}       |  40 ++++--
 cluster/metrics/rolling/ema_metrics.go             |  59 ++++++++
 .../{utils.go => rolling/rolling_metrics.go}       |  19 ++-
 cluster/metrics/rolling/rolling_metrics_mock.go    |  86 ++++++++++++
 cluster/metrics/rolling/sliding_window_counter.go  | 106 ++++++++++++++
 .../rolling/sliding_window_counter_metrics.go      |  59 ++++++++
 cluster/metrics/{ => utils}/utils.go               |  47 ++++++-
 cluster/utils/adaptivesvc.go                       |   9 ++
 14 files changed, 604 insertions(+), 146 deletions(-)

diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go 
b/cluster/cluster/adaptivesvc/cluster_invoker.go
index 7eaf456e8..ebee38faa 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -19,12 +19,10 @@ package adaptivesvc
 
 import (
        "context"
-       "strconv"
+       "time"
 )
 
 import (
-       "github.com/dubbogo/gost/log/logger"
-
        perrors "github.com/pkg/errors"
 )
 
@@ -65,45 +63,29 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx 
context.Context, invocation
 
        // select a node by the loadBalance
        invoker := lb.Select(invokers, invocation)
-
+       _ = 
metrics.SlidingWindowCounterMetrics.AppendMethodMetrics(invoker.GetURL(), 
invocation.MethodName(), metrics.Requests, 1)
        // invoke
        invocation.SetAttachment(constant.AdaptiveServiceEnabledKey, 
constant.AdaptiveServiceIsEnabled)
+       startTime := time.Now().UnixNano()
        result := invoker.Invoke(ctx, invocation)
-
+       rtt := time.Now().UnixNano() - startTime
        // if the adaptive service encounters an error, DO NOT
        // update the metrics.
-       if clsutils.IsAdaptiveServiceFailed(result.Error()) {
+       if shouldDrop(result.Error()) {
                return result
        }
+       _ = metrics.EMAMetrics.AppendMethodMetrics(invoker.GetURL(), 
invocation.MethodName(), metrics.RTT, float64(rtt))
+       _ = 
metrics.SlidingWindowCounterMetrics.AppendMethodMetrics(invoker.GetURL(), 
invocation.MethodName(), metrics.Accepts, 1)
+       return result
+}
 
-       // update metrics
-       var remainingStr string
-       remainingIface := 
result.Attachment(constant.AdaptiveServiceRemainingKey, nil)
-       if remainingIface != nil {
-               if str, strOK := remainingIface.(string); strOK {
-                       remainingStr = str
-               } else if strArr, strArrOK := remainingIface.([]string); 
strArrOK && len(strArr) > 0 {
-                       remainingStr = strArr[0]
-               }
-       }
-       if remainingStr == "" {
-               logger.Errorf("[adasvc cluster] The %s field type of value %v 
should be string.",
-                       constant.AdaptiveServiceRemainingKey, remainingIface)
-               return result
+func shouldDrop(err error) bool {
+       switch {
+       case clsutils.IsAdaptiveServiceFailed(err):
+               return true
+       case clsutils.IsDeadlineExceeded(err):
+               return true
+       default:
+               return false
        }
-       remaining, err := strconv.Atoi(remainingStr)
-       if err != nil {
-               logger.Warnf("the remaining is unexpected, we need a int type, 
but we got %s, err: %v.", remainingStr, err)
-               return result
-       }
-       logger.Debugf("[adasvc cluster] The server status was received 
successfully, %s: %#v",
-               constant.AdaptiveServiceRemainingKey, remainingStr)
-       err = metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
-               invocation.MethodName(), metrics.HillClimbing, 
uint64(remaining))
-       if err != nil {
-               logger.Warnf("adaptive service metrics update is failed, err: 
%v", err)
-               return &protocol.RPCResult{Err: err}
-       }
-
-       return result
 }
diff --git a/cluster/loadbalance/p2c/loadbalance.go 
b/cluster/loadbalance/p2c/loadbalance.go
index 12f2a37c5..1f60277ec 100644
--- a/cluster/loadbalance/p2c/loadbalance.go
+++ b/cluster/loadbalance/p2c/loadbalance.go
@@ -18,8 +18,6 @@
 package p2c
 
 import (
-       "errors"
-       "fmt"
        "math/rand"
        "sync"
        "time"
@@ -32,6 +30,7 @@ import (
 import (
        "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
        "dubbo.apache.org/dubbo-go/v3/cluster/metrics"
+       "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/protocol"
@@ -71,9 +70,6 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker, 
invocation protocol
        if len(invokers) == 1 {
                return invokers[0]
        }
-       // m is the Metrics, which saves the metrics of instance, invokers and 
methods
-       // The local metrics is available only for the earlier version.
-       m := metrics.LocalMetrics
        // picks two nodes randomly
        var i, j int
        if len(invokers) == 2 {
@@ -89,45 +85,11 @@ func (l *p2cLoadBalance) Select(invokers 
[]protocol.Invoker, invocation protocol
                i, invokers[i], j, invokers[j])
 
        methodName := invocation.ActualMethodName()
-       // remainingIIface, remainingJIface means remaining capacity of node i 
and node j.
-       // If one of the metrics is empty, invoke the invocation to that node 
directly.
-       remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), 
methodName, metrics.HillClimbing)
-       if err != nil {
-               if errors.Is(err, metrics.ErrMetricsNotFound) {
-                       logger.Debugf("[P2C select] The invoker[%d] was 
selected, because it hasn't been selected before.", i)
-                       return invokers[i]
-               }
-               logger.Warnf("get method metrics err: %v", err)
-               return nil
-       }
 
-       // TODO(justxuewei): It should have a strategy to drop some metrics 
after a period of time.
-       remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), 
methodName, metrics.HillClimbing)
-       if err != nil {
-               if errors.Is(err, metrics.ErrMetricsNotFound) {
-                       logger.Debugf("[P2C select] The invoker[%d] was 
selected, because it hasn't been selected before.", j)
-                       return invokers[j]
-               }
-               logger.Warnf("get method metrics err: %v", err)
-               return nil
-       }
-
-       // Convert interface to int, if the type is unexpected, panic 
immediately
-       remainingI, ok := remainingIIface.(uint64)
-       if !ok {
-               panic(fmt.Sprintf("[P2C select] the type of %s expects to be 
uint64, but gets %T",
-                       metrics.HillClimbing, remainingIIface))
-       }
-
-       remainingJ, ok := remainingJIface.(uint64)
-       if !ok {
-               panic(fmt.Sprintf("the type of %s expects to be uint64, but 
gets %T", metrics.HillClimbing, remainingJIface))
-       }
-
-       logger.Debugf("[P2C select] The invoker[%d] remaining is %d, and the 
invoker[%d] is %d.", i, remainingI, j, remainingJ)
+       weightI, weightJ := Weight(invokers[i].GetURL(), invokers[j].GetURL(), 
methodName)
 
        // For the remaining capacity, the bigger, the better.
-       if remainingI > remainingJ {
+       if weightI > weightJ {
                logger.Debugf("[P2C select] The invoker[%d] was selected.", i)
                return invokers[i]
        }
@@ -135,3 +97,46 @@ func (l *p2cLoadBalance) Select(invokers 
[]protocol.Invoker, invocation protocol
        logger.Debugf("[P2C select] The invoker[%d] was selected.", j)
        return invokers[j]
 }
+
+//Weight w_i = s_i + ε*t_i
+func Weight(url1, url2 *common.URL, methodName string) (weight1, weight2 
float64) {
+
+       s1 := successRateWeight(url1, methodName)
+       s2 := successRateWeight(url2, methodName)
+
+       rtt1, _ := metrics.EMAMetrics.GetMethodMetrics(url1, methodName, 
metrics.RTT)
+       rtt2, _ := metrics.EMAMetrics.GetMethodMetrics(url2, methodName, 
metrics.RTT)
+
+       logger.Debugf("[P2C Weight Metrics] [invoker1] %s's s score: %f, rtt: 
%f; [invoker2] %s's s score: %f, rtt: %f.",
+               url1.Ip, s1, rtt1, url2.Ip, s2, rtt2)
+       avgRtt := (rtt1 + rtt2) / 2
+       t1 := normalize((1 + avgRtt) / (1 + rtt1))
+       t2 := normalize((1 + avgRtt) / (1 + rtt2))
+
+       e := (s1 + s2) / (t1 + t2)
+
+       weight1 = s1 + e*t1
+       weight2 = s2 + e*t2
+       logger.Debugf("[P2C Weight] [invoker1] %s's s score: %f, t score: %f, 
weight: %f; [invoker2] %s's s score: %f, t score: %f, weight: %f.",
+               url1.Ip, s1, t1, weight1, url2.Ip, s2, t2, weight2)
+       return weight1, weight2
+}
+
+func normalize(x float64) float64 {
+       return x / (x + 1)
+}
+
+func successRateWeight(url *common.URL, methodName string) float64 {
+       requests, _ := 
metrics.SlidingWindowCounterMetrics.GetMethodMetrics(url, methodName, 
metrics.Requests)
+       accepts, _ := metrics.SlidingWindowCounterMetrics.GetMethodMetrics(url, 
methodName, metrics.Accepts)
+
+       r := (1 + accepts) / (1 + requests)
+
+       //r will greater than 1 because SlidingWindowCounter collects the most 
recent data and there is a delay in receiving a response.
+       if r > 1 {
+               r = 1
+       }
+       logger.Debugf("[P2C Weight] [Success Rate] %s requests: %f, accepts: 
%f, success rate: %f.",
+               url.Ip, requests, accepts, r)
+       return r
+}
diff --git a/cluster/loadbalance/p2c/loadbalance_test.go 
b/cluster/loadbalance/p2c/loadbalance_test.go
index ff27ed0a6..e74719552 100644
--- a/cluster/loadbalance/p2c/loadbalance_test.go
+++ b/cluster/loadbalance/p2c/loadbalance_test.go
@@ -30,6 +30,8 @@ import (
 
 import (
        "dubbo.apache.org/dubbo-go/v3/cluster/metrics"
+       "dubbo.apache.org/dubbo-go/v3/cluster/metrics/rolling"
+       "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        protoinvoc "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -64,21 +66,40 @@ func TestLoadBalance(t *testing.T) {
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
 
-               m := metrics.NewMockMetrics(ctrl)
-               metrics.LocalMetrics = m
+               emaMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.EMAMetrics = emaMetrics
+               slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.SlidingWindowCounterMetrics = 
slidingWindowCounterMetrics
 
                url0, _ := 
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
                url1, _ := 
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
-
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+               //rtt
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
                        Times(1).
-                       Return(uint64(10), nil)
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+                       Return(float64(10), nil)
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
                        Times(1).
-                       Return(uint64(5), nil)
-
+                       Return(float64(5), nil)
+               //requests
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(10), nil)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(10), nil)
+               //accepts
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+                       Times(1).
+                       Return(float64(5), nil)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+                       Times(1).
+                       Return(float64(5), nil)
                ivkArr := []protocol.Invoker{
                        protocol.NewBaseInvoker(url0),
                        protocol.NewBaseInvoker(url1),
@@ -86,7 +107,7 @@ func TestLoadBalance(t *testing.T) {
 
                ivk := lb.Select(ivkArr, invocation)
 
-               assert.Equal(t, ivkArr[0].GetURL().String(), 
ivk.GetURL().String())
+               assert.Equal(t, ivkArr[1].GetURL().String(), 
ivk.GetURL().String())
        })
 
        t.Run("multiple invokers", func(t *testing.T) {
@@ -94,21 +115,42 @@ func TestLoadBalance(t *testing.T) {
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
 
-               m := metrics.NewMockMetrics(ctrl)
-               metrics.LocalMetrics = m
+               emaMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.EMAMetrics = emaMetrics
+               slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.SlidingWindowCounterMetrics = 
slidingWindowCounterMetrics
 
                url0, _ := 
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
                url1, _ := 
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
                url2, _ := 
common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")
 
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+               //rtt
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+                       Times(1).
+                       Return(float64(10), nil)
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+                       Times(1).
+                       Return(float64(5), nil)
+               //requests
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(10), nil)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(10), nil)
+               //accepts
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
                        Times(1).
-                       Return(uint64(10), nil)
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+                       Return(float64(5), nil)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
                        Times(1).
-                       Return(uint64(5), nil)
+                       Return(float64(5), nil)
 
                ivkArr := []protocol.Invoker{
                        protocol.NewBaseInvoker(url0),
@@ -118,7 +160,7 @@ func TestLoadBalance(t *testing.T) {
 
                ivk := lb.Select(ivkArr, invocation)
 
-               assert.Equal(t, ivkArr[0].GetURL().String(), 
ivk.GetURL().String())
+               assert.Equal(t, ivkArr[1].GetURL().String(), 
ivk.GetURL().String())
        })
 
        t.Run("metrics i not found", func(t *testing.T) {
@@ -126,17 +168,42 @@ func TestLoadBalance(t *testing.T) {
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
 
-               m := metrics.NewMockMetrics(ctrl)
-               metrics.LocalMetrics = m
+               emaMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.EMAMetrics = emaMetrics
+               slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.SlidingWindowCounterMetrics = 
slidingWindowCounterMetrics
 
                url0, _ := 
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
                url1, _ := 
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
                url2, _ := 
common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")
 
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+               //rtt
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+                       Times(1).
+                       Return(float64(0), utils.ErrMetricsNotFound)
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+                       Times(1).
+                       Return(float64(5), nil)
+               //requests
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(0), utils.ErrMetricsNotFound)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(10), nil)
+               //accepts
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
                        Times(1).
-                       Return(0, metrics.ErrMetricsNotFound)
+                       Return(float64(0), utils.ErrMetricsNotFound)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+                       Times(1).
+                       Return(float64(5), nil)
 
                ivkArr := []protocol.Invoker{
                        protocol.NewBaseInvoker(url0),
@@ -154,22 +221,42 @@ func TestLoadBalance(t *testing.T) {
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
 
-               m := metrics.NewMockMetrics(ctrl)
-               metrics.LocalMetrics = m
+               emaMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.EMAMetrics = emaMetrics
+               slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+               metrics.SlidingWindowCounterMetrics = 
slidingWindowCounterMetrics
 
                url0, _ := 
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
                url1, _ := 
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
                url2, _ := 
common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")
 
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+               //rtt
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
                        Times(1).
-                       Return(uint64(0), nil)
-
-               m.EXPECT().
-                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+                       Return(float64(10), nil)
+               emaMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+                       Times(1).
+                       Return(float64(0), utils.ErrMetricsNotFound)
+               //requests
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(10), nil)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+                       Times(1).
+                       Return(float64(0), utils.ErrMetricsNotFound)
+               //accepts
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url0), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+                       Times(1).
+                       Return(float64(5), nil)
+               slidingWindowCounterMetrics.EXPECT().
+                       GetMethodMetrics(gomock.Eq(url1), 
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
                        Times(1).
-                       Return(uint64(0), metrics.ErrMetricsNotFound)
+                       Return(float64(0), utils.ErrMetricsNotFound)
 
                ivkArr := []protocol.Invoker{
                        protocol.NewBaseInvoker(url0),
diff --git a/cluster/metrics/constants.go b/cluster/metrics/constants.go
index 2bc6b4fea..557b3f7e6 100644
--- a/cluster/metrics/constants.go
+++ b/cluster/metrics/constants.go
@@ -19,4 +19,7 @@ package metrics
 
 const (
        HillClimbing = "hill-climbing"
+       Requests     = "requests"
+       Accepts      = "accepts"
+       RTT          = "rtt"
 )
diff --git a/cluster/metrics/local_metrics.go b/cluster/metrics/local_metrics.go
index 16c094cb8..a2d221b3f 100644
--- a/cluster/metrics/local_metrics.go
+++ b/cluster/metrics/local_metrics.go
@@ -18,18 +18,28 @@
 package metrics
 
 import (
-       "fmt"
        "sync"
 )
 
 import (
+       "dubbo.apache.org/dubbo-go/v3/cluster/metrics/rolling"
+       "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
        "dubbo.apache.org/dubbo-go/v3/common"
 )
 
-var LocalMetrics Metrics
+var (
+       LocalMetrics                Metrics
+       EMAMetrics                  rolling.Metrics
+       SlidingWindowCounterMetrics rolling.Metrics
+)
 
 func init() {
        LocalMetrics = newLocalMetrics()
+       EMAMetrics = rolling.NewEMAMetrics(rolling.EMAOpts{Alpha: 0.75})
+       SlidingWindowCounterMetrics = 
rolling.NewSlidingWindowCounterMetrics(rolling.SlidingWindowCounterOpts{
+               Size:           10,
+               BucketDuration: 50000000,
+       })
 }
 
 var _ Metrics = (*localMetrics)(nil)
@@ -50,17 +60,17 @@ func newLocalMetrics() *localMetrics {
 func (m *localMetrics) GetMethodMetrics(url *common.URL, methodName, key 
string) (interface{}, error) {
        m.lock.RLock()
        defer m.lock.RUnlock()
-       metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url), 
getInvokerKey(url), methodName, key)
+       metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
        if metrics, ok := m.metrics[metricsKey]; ok {
                return metrics, nil
        }
-       return nil, ErrMetricsNotFound
+       return nil, utils.ErrMetricsNotFound
 }
 
 func (m *localMetrics) SetMethodMetrics(url *common.URL, methodName, key 
string, value interface{}) error {
        m.lock.Lock()
        defer m.lock.Unlock()
-       metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url), 
getInvokerKey(url), methodName, key)
+       metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
        m.metrics[metricsKey] = value
        return nil
 }
diff --git a/cluster/metrics/metrics.go b/cluster/metrics/metrics.go
index 0f61dcf1c..7d8240cc2 100644
--- a/cluster/metrics/metrics.go
+++ b/cluster/metrics/metrics.go
@@ -17,18 +17,10 @@
 
 package metrics
 
-import (
-       "github.com/pkg/errors"
-)
-
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
 )
 
-var (
-       ErrMetricsNotFound = errors.New("metrics not found")
-)
-
 type Metrics interface {
        // GetMethodMetrics returns method-level metrics, the format of key is 
"{instance key}.{invoker key}.{method key}.{key}"
        // url is invoker's url, which contains information about instance and 
invoker.
diff --git a/cluster/metrics/utils.go b/cluster/metrics/rolling/ema.go
similarity index 59%
copy from cluster/metrics/utils.go
copy to cluster/metrics/rolling/ema.go
index 65cc9e249..e0219139f 100644
--- a/cluster/metrics/utils.go
+++ b/cluster/metrics/rolling/ema.go
@@ -15,20 +15,42 @@
  * limitations under the License.
  */
 
-package metrics
+package rolling
 
 import (
-       "fmt"
+       "sync"
 )
 
-import (
-       "dubbo.apache.org/dubbo-go/v3/common"
-)
+// EMA is a struct implemented Exponential Moving Average.
+// val = old * (1 - alpha) + new * alpha
+type EMA struct {
+       mu    sync.RWMutex
+       alpha float64
+       val   float64
+}
+
+type EMAOpts struct {
+       Alpha float64
+}
 
-func getInvokerKey(url *common.URL) string {
-       return url.Path
+// NewEMA creates a new EMA based on the given EMAOpts.
+func NewEMA(opts EMAOpts) *EMA {
+       return &EMA{
+               alpha: opts.Alpha,
+               val:   0,
+       }
 }
 
-func getInstanceKey(url *common.URL) string {
-       return fmt.Sprintf("%s:%s", url.Ip, url.Port)
+func (e *EMA) Append(v float64) {
+       e.mu.Lock()
+       defer e.mu.Unlock()
+
+       e.val = e.val*(1-e.alpha) + v*e.alpha
+}
+
+func (e *EMA) Value() float64 {
+       e.mu.RLock()
+       defer e.mu.RUnlock()
+
+       return e.val
 }
diff --git a/cluster/metrics/rolling/ema_metrics.go 
b/cluster/metrics/rolling/ema_metrics.go
new file mode 100644
index 000000000..f0e69186c
--- /dev/null
+++ b/cluster/metrics/rolling/ema_metrics.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rolling
+
+import (
+       "sync"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
+       "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+var _ Metrics = (*EMAMetrics)(nil)
+
+type EMAMetrics struct {
+       opts    EMAOpts
+       metrics sync.Map
+}
+
+func NewEMAMetrics(opts EMAOpts) *EMAMetrics {
+       return &EMAMetrics{
+               opts: opts,
+       }
+}
+
+func (m *EMAMetrics) GetMethodMetrics(url *common.URL, methodName, key string) 
(float64, error) {
+       metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+       if metrics, ok := m.metrics.Load(metricsKey); ok {
+               return metrics.(*EMA).Value(), nil
+       }
+       return 0, utils.ErrMetricsNotFound
+}
+
+func (m *EMAMetrics) AppendMethodMetrics(url *common.URL, methodName, key 
string, val float64) error {
+       metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+       if metrics, ok := m.metrics.Load(metricsKey); ok {
+               metrics.(*EMA).Append(val)
+       } else {
+               metrics, _ = m.metrics.LoadOrStore(metricsKey, NewEMA(m.opts))
+               metrics.(*EMA).Append(val)
+       }
+       return nil
+}
diff --git a/cluster/metrics/utils.go 
b/cluster/metrics/rolling/rolling_metrics.go
similarity index 64%
copy from cluster/metrics/utils.go
copy to cluster/metrics/rolling/rolling_metrics.go
index 65cc9e249..ae7bf39ff 100644
--- a/cluster/metrics/utils.go
+++ b/cluster/metrics/rolling/rolling_metrics.go
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package metrics
-
-import (
-       "fmt"
-)
+package rolling
 
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
 )
 
-func getInvokerKey(url *common.URL) string {
-       return url.Path
-}
-
-func getInstanceKey(url *common.URL) string {
-       return fmt.Sprintf("%s:%s", url.Ip, url.Port)
+type Metrics interface {
+       // GetMethodMetrics returns method-level metrics, the format of key is 
"{instance key}.{invoker key}.{method key}.{key}"
+       // url is invoker's url, which contains information about instance and 
invoker.
+       // methodName is the method name.
+       // key is the key of the metrics.
+       GetMethodMetrics(url *common.URL, methodName, key string) (float64, 
error)
+       AppendMethodMetrics(url *common.URL, methodName, key string, value 
float64) error
 }
diff --git a/cluster/metrics/rolling/rolling_metrics_mock.go 
b/cluster/metrics/rolling/rolling_metrics_mock.go
new file mode 100644
index 000000000..7bf07b08d
--- /dev/null
+++ b/cluster/metrics/rolling/rolling_metrics_mock.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: ./rolling_metrics.go
+
+// Package rolling is a generated GoMock package.
+package rolling
+
+import (
+       reflect "reflect"
+)
+
+import (
+       gomock "github.com/golang/mock/gomock"
+)
+
+import (
+       common "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+// MockMetrics is a mock of Metrics interface.
+type MockMetrics struct {
+       ctrl     *gomock.Controller
+       recorder *MockMetricsMockRecorder
+}
+
+// MockMetricsMockRecorder is the mock recorder for MockMetrics.
+type MockMetricsMockRecorder struct {
+       mock *MockMetrics
+}
+
+// NewMockMetrics creates a new mock instance.
+func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics {
+       mock := &MockMetrics{ctrl: ctrl}
+       mock.recorder = &MockMetricsMockRecorder{mock}
+       return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder {
+       return m.recorder
+}
+
+// AppendMethodMetrics mocks base method.
+func (m *MockMetrics) AppendMethodMetrics(url *common.URL, methodName, key 
string, value float64) error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "AppendMethodMetrics", url, methodName, key, 
value)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// AppendMethodMetrics indicates an expected call of AppendMethodMetrics.
+func (mr *MockMetricsMockRecorder) AppendMethodMetrics(url, methodName, key, 
value interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"AppendMethodMetrics", reflect.TypeOf((*MockMetrics)(nil).AppendMethodMetrics), 
url, methodName, key, value)
+}
+
+// GetMethodMetrics mocks base method.
+func (m *MockMetrics) GetMethodMetrics(url *common.URL, methodName, key 
string) (float64, error) {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "GetMethodMetrics", url, methodName, key)
+       ret0, _ := ret[0].(float64)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// GetMethodMetrics indicates an expected call of GetMethodMetrics.
+func (mr *MockMetricsMockRecorder) GetMethodMetrics(url, methodName, key 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"GetMethodMetrics", reflect.TypeOf((*MockMetrics)(nil).GetMethodMetrics), url, 
methodName, key)
+}
diff --git a/cluster/metrics/rolling/sliding_window_counter.go 
b/cluster/metrics/rolling/sliding_window_counter.go
new file mode 100644
index 000000000..ef8d7c673
--- /dev/null
+++ b/cluster/metrics/rolling/sliding_window_counter.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rolling
+
+import (
+       "sync"
+       "time"
+)
+
+// SlidingWindowCounter is a policy for ring window based on time duration.
+// SlidingWindowCounter moves bucket offset with time duration.
+// e.g. If the last point is appended one bucket duration ago,
+// SlidingWindowCounter will increment current offset.
+type SlidingWindowCounter struct {
+       size           int
+       mu             sync.RWMutex
+       buckets        []float64
+       count          float64
+       offset         int
+       bucketDuration time.Duration
+       lastAppendTime time.Time
+}
+
+// SlidingWindowCounterOpts contains the arguments for creating 
SlidingWindowCounter.
+type SlidingWindowCounterOpts struct {
+       Size           int
+       BucketDuration time.Duration
+}
+
+// NewSlidingWindowCounter creates a new SlidingWindowCounter based on the 
given window and SlidingWindowCounterOpts.
+func NewSlidingWindowCounter(opts SlidingWindowCounterOpts) 
*SlidingWindowCounter {
+       buckets := make([]float64, opts.Size)
+
+       return &SlidingWindowCounter{
+               size:           opts.Size,
+               offset:         0,
+               buckets:        buckets,
+               bucketDuration: opts.BucketDuration,
+               lastAppendTime: time.Now(),
+       }
+}
+
+func (c *SlidingWindowCounter) timespan() int {
+       v := int(time.Since(c.lastAppendTime) / c.bucketDuration)
+       if v > -1 { // maybe time backwards
+               return v
+       }
+       return c.size
+}
+
+func (c *SlidingWindowCounter) Append(val float64) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       // move offset
+       timespan := c.timespan()
+       if timespan > 0 {
+               start := (c.offset + 1) % c.size
+               end := (c.offset + timespan) % c.size
+               // reset the expired buckets
+               c.ResetBuckets(start, timespan)
+               c.offset = end
+               c.lastAppendTime = c.lastAppendTime.Add(time.Duration(timespan 
* int(c.bucketDuration)))
+       }
+
+       c.buckets[c.offset] += val
+       c.count += val
+}
+
+func (c *SlidingWindowCounter) Value() float64 {
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+
+       return c.count
+}
+
+// ResetBucket empties the bucket based on the given offset.
+func (c *SlidingWindowCounter) ResetBucket(offset int) {
+       c.count -= c.buckets[offset%c.size]
+       c.buckets[offset%c.size] = 0
+}
+
+// ResetBuckets empties the buckets based on the given offsets.
+func (c *SlidingWindowCounter) ResetBuckets(offset int, count int) {
+       if count > c.size {
+               count = c.size
+       }
+       for i := 0; i < count; i++ {
+               c.ResetBucket(offset + i)
+       }
+}
diff --git a/cluster/metrics/rolling/sliding_window_counter_metrics.go 
b/cluster/metrics/rolling/sliding_window_counter_metrics.go
new file mode 100644
index 000000000..87b4b2c3d
--- /dev/null
+++ b/cluster/metrics/rolling/sliding_window_counter_metrics.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rolling
+
+import (
+       "sync"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
+       "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+var _ Metrics = (*SlidingWindowCounterMetrics)(nil)
+
+type SlidingWindowCounterMetrics struct {
+       opts    SlidingWindowCounterOpts
+       metrics sync.Map
+}
+
+func NewSlidingWindowCounterMetrics(opts SlidingWindowCounterOpts) 
*SlidingWindowCounterMetrics {
+       return &SlidingWindowCounterMetrics{
+               opts: opts,
+       }
+}
+
+func (m *SlidingWindowCounterMetrics) GetMethodMetrics(url *common.URL, 
methodName, key string) (float64, error) {
+       metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+       if metrics, ok := m.metrics.Load(metricsKey); ok {
+               return metrics.(*SlidingWindowCounter).Value(), nil
+       }
+       return 0, utils.ErrMetricsNotFound
+}
+
+func (m *SlidingWindowCounterMetrics) AppendMethodMetrics(url *common.URL, 
methodName, key string, val float64) error {
+       metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+       if metrics, ok := m.metrics.Load(metricsKey); ok {
+               metrics.(*SlidingWindowCounter).Append(val)
+       } else {
+               metrics, _ = m.metrics.LoadOrStore(metricsKey, 
NewSlidingWindowCounter(m.opts))
+               metrics.(*SlidingWindowCounter).Append(val)
+       }
+       return nil
+}
diff --git a/cluster/metrics/utils.go b/cluster/metrics/utils/utils.go
similarity index 53%
rename from cluster/metrics/utils.go
rename to cluster/metrics/utils/utils.go
index 65cc9e249..e84312b17 100644
--- a/cluster/metrics/utils.go
+++ b/cluster/metrics/utils/utils.go
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package metrics
+package utils
 
 import (
+       "errors"
        "fmt"
 )
 
@@ -25,10 +26,50 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common"
 )
 
-func getInvokerKey(url *common.URL) string {
+var (
+       ErrMetricsNotFound = errors.New("metrics not found")
+)
+
+func GetInvokerKey(url *common.URL) string {
        return url.Path
 }
 
-func getInstanceKey(url *common.URL) string {
+func GetInstanceKey(url *common.URL) string {
        return fmt.Sprintf("%s:%s", url.Ip, url.Port)
 }
+
+func ToFloat64(i interface{}) float64 {
+       if i == nil {
+               return 0
+       }
+       switch s := i.(type) {
+       case float64:
+               return s
+       case float32:
+               return float64(s)
+       case int64:
+               return float64(s)
+       case int32:
+               return float64(s)
+       case int16:
+               return float64(s)
+       case int8:
+               return float64(s)
+       case uint:
+               return float64(s)
+       case uint64:
+               return float64(s)
+       case uint32:
+               return float64(s)
+       case uint16:
+               return float64(s)
+       case uint8:
+               return float64(s)
+       default:
+               return 0
+       }
+}
+
+func GetMethodMetricsKey(url *common.URL, methodName, key string) string {
+       return fmt.Sprintf("%s.%s.%s.%s", GetInstanceKey(url), 
GetInvokerKey(url), methodName, key)
+}
diff --git a/cluster/utils/adaptivesvc.go b/cluster/utils/adaptivesvc.go
index e92f549ec..04edec20b 100644
--- a/cluster/utils/adaptivesvc.go
+++ b/cluster/utils/adaptivesvc.go
@@ -18,6 +18,8 @@
 package utils
 
 import (
+       "context"
+       "errors"
        "fmt"
        "strings"
 )
@@ -44,3 +46,10 @@ func IsAdaptiveServiceFailed(err error) bool {
        }
        return strings.HasPrefix(err.Error(), 
adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
 }
+
+func IsDeadlineExceeded(err error) bool {
+       if err == nil {
+               return false
+       }
+       return errors.Is(err, context.DeadlineExceeded)
+}


Reply via email to