This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0-adaptive-stream
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0-adaptive-stream by this
push:
new 4ed7c71cd feat: adaptive load balancer of adaptive service (#2067)
4ed7c71cd is described below
commit 4ed7c71cd850b07ebb0fa93e695f981d299a4bc3
Author: Zhang Yepeng <[email protected]>
AuthorDate: Sat Dec 10 13:43:40 2022 +0800
feat: adaptive load balancer of adaptive service (#2067)
* feat(adaptive service): add success rate and rtt to p2c's weight
* feat(adaptive service): p2c weight
* feat(adaptive service): ema
* fix license
* fix time
* fix error tests
* fix p2c tests
* remove useless division calculations
* move ema and slidingWindowCounter to RollingMetrics
* make reading from ema and slidingWindowCounter thread-safe
* add GetMethodMetricsKey method
* validate rolling metrics implementation
* add license
* fix p2c tests
* add license
* rename sliding_window_counter_metrics
* remove useless factor
---
cluster/cluster/adaptivesvc/cluster_invoker.go | 52 +++----
cluster/loadbalance/p2c/loadbalance.go | 87 ++++++------
cluster/loadbalance/p2c/loadbalance_test.go | 155 ++++++++++++++++-----
cluster/metrics/constants.go | 3 +
cluster/metrics/local_metrics.go | 20 ++-
cluster/metrics/metrics.go | 8 --
cluster/metrics/{utils.go => rolling/ema.go} | 40 ++++--
cluster/metrics/rolling/ema_metrics.go | 59 ++++++++
.../{utils.go => rolling/rolling_metrics.go} | 19 ++-
cluster/metrics/rolling/rolling_metrics_mock.go | 86 ++++++++++++
cluster/metrics/rolling/sliding_window_counter.go | 106 ++++++++++++++
.../rolling/sliding_window_counter_metrics.go | 59 ++++++++
cluster/metrics/{ => utils}/utils.go | 47 ++++++-
cluster/utils/adaptivesvc.go | 9 ++
14 files changed, 604 insertions(+), 146 deletions(-)
diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go
b/cluster/cluster/adaptivesvc/cluster_invoker.go
index 7eaf456e8..ebee38faa 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -19,12 +19,10 @@ package adaptivesvc
import (
"context"
- "strconv"
+ "time"
)
import (
- "github.com/dubbogo/gost/log/logger"
-
perrors "github.com/pkg/errors"
)
@@ -65,45 +63,29 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx
context.Context, invocation
// select a node by the loadBalance
invoker := lb.Select(invokers, invocation)
-
+ _ =
metrics.SlidingWindowCounterMetrics.AppendMethodMetrics(invoker.GetURL(),
invocation.MethodName(), metrics.Requests, 1)
// invoke
invocation.SetAttachment(constant.AdaptiveServiceEnabledKey,
constant.AdaptiveServiceIsEnabled)
+ startTime := time.Now().UnixNano()
result := invoker.Invoke(ctx, invocation)
-
+ rtt := time.Now().UnixNano() - startTime
// if the adaptive service encounters an error, DO NOT
// update the metrics.
- if clsutils.IsAdaptiveServiceFailed(result.Error()) {
+ if shouldDrop(result.Error()) {
return result
}
+ _ = metrics.EMAMetrics.AppendMethodMetrics(invoker.GetURL(),
invocation.MethodName(), metrics.RTT, float64(rtt))
+ _ =
metrics.SlidingWindowCounterMetrics.AppendMethodMetrics(invoker.GetURL(),
invocation.MethodName(), metrics.Accepts, 1)
+ return result
+}
- // update metrics
- var remainingStr string
- remainingIface :=
result.Attachment(constant.AdaptiveServiceRemainingKey, nil)
- if remainingIface != nil {
- if str, strOK := remainingIface.(string); strOK {
- remainingStr = str
- } else if strArr, strArrOK := remainingIface.([]string);
strArrOK && len(strArr) > 0 {
- remainingStr = strArr[0]
- }
- }
- if remainingStr == "" {
- logger.Errorf("[adasvc cluster] The %s field type of value %v
should be string.",
- constant.AdaptiveServiceRemainingKey, remainingIface)
- return result
+func shouldDrop(err error) bool {
+ switch {
+ case clsutils.IsAdaptiveServiceFailed(err):
+ return true
+ case clsutils.IsDeadlineExceeded(err):
+ return true
+ default:
+ return false
}
- remaining, err := strconv.Atoi(remainingStr)
- if err != nil {
- logger.Warnf("the remaining is unexpected, we need a int type,
but we got %s, err: %v.", remainingStr, err)
- return result
- }
- logger.Debugf("[adasvc cluster] The server status was received
successfully, %s: %#v",
- constant.AdaptiveServiceRemainingKey, remainingStr)
- err = metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
- invocation.MethodName(), metrics.HillClimbing,
uint64(remaining))
- if err != nil {
- logger.Warnf("adaptive service metrics update is failed, err:
%v", err)
- return &protocol.RPCResult{Err: err}
- }
-
- return result
}
diff --git a/cluster/loadbalance/p2c/loadbalance.go
b/cluster/loadbalance/p2c/loadbalance.go
index 12f2a37c5..1f60277ec 100644
--- a/cluster/loadbalance/p2c/loadbalance.go
+++ b/cluster/loadbalance/p2c/loadbalance.go
@@ -18,8 +18,6 @@
package p2c
import (
- "errors"
- "fmt"
"math/rand"
"sync"
"time"
@@ -32,6 +30,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
+ "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
@@ -71,9 +70,6 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker,
invocation protocol
if len(invokers) == 1 {
return invokers[0]
}
- // m is the Metrics, which saves the metrics of instance, invokers and
methods
- // The local metrics is available only for the earlier version.
- m := metrics.LocalMetrics
// picks two nodes randomly
var i, j int
if len(invokers) == 2 {
@@ -89,45 +85,11 @@ func (l *p2cLoadBalance) Select(invokers
[]protocol.Invoker, invocation protocol
i, invokers[i], j, invokers[j])
methodName := invocation.ActualMethodName()
- // remainingIIface, remainingJIface means remaining capacity of node i
and node j.
- // If one of the metrics is empty, invoke the invocation to that node
directly.
- remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(),
methodName, metrics.HillClimbing)
- if err != nil {
- if errors.Is(err, metrics.ErrMetricsNotFound) {
- logger.Debugf("[P2C select] The invoker[%d] was
selected, because it hasn't been selected before.", i)
- return invokers[i]
- }
- logger.Warnf("get method metrics err: %v", err)
- return nil
- }
- // TODO(justxuewei): It should have a strategy to drop some metrics
after a period of time.
- remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(),
methodName, metrics.HillClimbing)
- if err != nil {
- if errors.Is(err, metrics.ErrMetricsNotFound) {
- logger.Debugf("[P2C select] The invoker[%d] was
selected, because it hasn't been selected before.", j)
- return invokers[j]
- }
- logger.Warnf("get method metrics err: %v", err)
- return nil
- }
-
- // Convert interface to int, if the type is unexpected, panic
immediately
- remainingI, ok := remainingIIface.(uint64)
- if !ok {
- panic(fmt.Sprintf("[P2C select] the type of %s expects to be
uint64, but gets %T",
- metrics.HillClimbing, remainingIIface))
- }
-
- remainingJ, ok := remainingJIface.(uint64)
- if !ok {
- panic(fmt.Sprintf("the type of %s expects to be uint64, but
gets %T", metrics.HillClimbing, remainingJIface))
- }
-
- logger.Debugf("[P2C select] The invoker[%d] remaining is %d, and the
invoker[%d] is %d.", i, remainingI, j, remainingJ)
+ weightI, weightJ := Weight(invokers[i].GetURL(), invokers[j].GetURL(),
methodName)
// For the remaining capacity, the bigger, the better.
- if remainingI > remainingJ {
+ if weightI > weightJ {
logger.Debugf("[P2C select] The invoker[%d] was selected.", i)
return invokers[i]
}
@@ -135,3 +97,46 @@ func (l *p2cLoadBalance) Select(invokers
[]protocol.Invoker, invocation protocol
logger.Debugf("[P2C select] The invoker[%d] was selected.", j)
return invokers[j]
}
+
+//Weight w_i = s_i + ε*t_i
+func Weight(url1, url2 *common.URL, methodName string) (weight1, weight2
float64) {
+
+ s1 := successRateWeight(url1, methodName)
+ s2 := successRateWeight(url2, methodName)
+
+ rtt1, _ := metrics.EMAMetrics.GetMethodMetrics(url1, methodName,
metrics.RTT)
+ rtt2, _ := metrics.EMAMetrics.GetMethodMetrics(url2, methodName,
metrics.RTT)
+
+ logger.Debugf("[P2C Weight Metrics] [invoker1] %s's s score: %f, rtt:
%f; [invoker2] %s's s score: %f, rtt: %f.",
+ url1.Ip, s1, rtt1, url2.Ip, s2, rtt2)
+ avgRtt := (rtt1 + rtt2) / 2
+ t1 := normalize((1 + avgRtt) / (1 + rtt1))
+ t2 := normalize((1 + avgRtt) / (1 + rtt2))
+
+ e := (s1 + s2) / (t1 + t2)
+
+ weight1 = s1 + e*t1
+ weight2 = s2 + e*t2
+ logger.Debugf("[P2C Weight] [invoker1] %s's s score: %f, t score: %f,
weight: %f; [invoker2] %s's s score: %f, t score: %f, weight: %f.",
+ url1.Ip, s1, t1, weight1, url2.Ip, s2, t2, weight2)
+ return weight1, weight2
+}
+
+func normalize(x float64) float64 {
+ return x / (x + 1)
+}
+
+func successRateWeight(url *common.URL, methodName string) float64 {
+ requests, _ :=
metrics.SlidingWindowCounterMetrics.GetMethodMetrics(url, methodName,
metrics.Requests)
+ accepts, _ := metrics.SlidingWindowCounterMetrics.GetMethodMetrics(url,
methodName, metrics.Accepts)
+
+ r := (1 + accepts) / (1 + requests)
+
+ //r will greater than 1 because SlidingWindowCounter collects the most
recent data and there is a delay in receiving a response.
+ if r > 1 {
+ r = 1
+ }
+ logger.Debugf("[P2C Weight] [Success Rate] %s requests: %f, accepts:
%f, success rate: %f.",
+ url.Ip, requests, accepts, r)
+ return r
+}
diff --git a/cluster/loadbalance/p2c/loadbalance_test.go
b/cluster/loadbalance/p2c/loadbalance_test.go
index ff27ed0a6..e74719552 100644
--- a/cluster/loadbalance/p2c/loadbalance_test.go
+++ b/cluster/loadbalance/p2c/loadbalance_test.go
@@ -30,6 +30,8 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
+ "dubbo.apache.org/dubbo-go/v3/cluster/metrics/rolling"
+ "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/protocol"
protoinvoc "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -64,21 +66,40 @@ func TestLoadBalance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
- m := metrics.NewMockMetrics(ctrl)
- metrics.LocalMetrics = m
+ emaMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.EMAMetrics = emaMetrics
+ slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.SlidingWindowCounterMetrics =
slidingWindowCounterMetrics
url0, _ :=
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ :=
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
-
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ //rtt
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
Times(1).
- Return(uint64(10), nil)
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ Return(float64(10), nil)
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
Times(1).
- Return(uint64(5), nil)
-
+ Return(float64(5), nil)
+ //requests
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(10), nil)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(10), nil)
+ //accepts
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+ Times(1).
+ Return(float64(5), nil)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+ Times(1).
+ Return(float64(5), nil)
ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
protocol.NewBaseInvoker(url1),
@@ -86,7 +107,7 @@ func TestLoadBalance(t *testing.T) {
ivk := lb.Select(ivkArr, invocation)
- assert.Equal(t, ivkArr[0].GetURL().String(),
ivk.GetURL().String())
+ assert.Equal(t, ivkArr[1].GetURL().String(),
ivk.GetURL().String())
})
t.Run("multiple invokers", func(t *testing.T) {
@@ -94,21 +115,42 @@ func TestLoadBalance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
- m := metrics.NewMockMetrics(ctrl)
- metrics.LocalMetrics = m
+ emaMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.EMAMetrics = emaMetrics
+ slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.SlidingWindowCounterMetrics =
slidingWindowCounterMetrics
url0, _ :=
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ :=
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
url2, _ :=
common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ //rtt
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+ Times(1).
+ Return(float64(10), nil)
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+ Times(1).
+ Return(float64(5), nil)
+ //requests
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(10), nil)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(10), nil)
+ //accepts
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
Times(1).
- Return(uint64(10), nil)
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ Return(float64(5), nil)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
Times(1).
- Return(uint64(5), nil)
+ Return(float64(5), nil)
ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
@@ -118,7 +160,7 @@ func TestLoadBalance(t *testing.T) {
ivk := lb.Select(ivkArr, invocation)
- assert.Equal(t, ivkArr[0].GetURL().String(),
ivk.GetURL().String())
+ assert.Equal(t, ivkArr[1].GetURL().String(),
ivk.GetURL().String())
})
t.Run("metrics i not found", func(t *testing.T) {
@@ -126,17 +168,42 @@ func TestLoadBalance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
- m := metrics.NewMockMetrics(ctrl)
- metrics.LocalMetrics = m
+ emaMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.EMAMetrics = emaMetrics
+ slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.SlidingWindowCounterMetrics =
slidingWindowCounterMetrics
url0, _ :=
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ :=
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
url2, _ :=
common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ //rtt
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+ Times(1).
+ Return(float64(0), utils.ErrMetricsNotFound)
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+ Times(1).
+ Return(float64(5), nil)
+ //requests
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(0), utils.ErrMetricsNotFound)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(10), nil)
+ //accepts
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
Times(1).
- Return(0, metrics.ErrMetricsNotFound)
+ Return(float64(0), utils.ErrMetricsNotFound)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+ Times(1).
+ Return(float64(5), nil)
ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
@@ -154,22 +221,42 @@ func TestLoadBalance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
- m := metrics.NewMockMetrics(ctrl)
- metrics.LocalMetrics = m
+ emaMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.EMAMetrics = emaMetrics
+ slidingWindowCounterMetrics := rolling.NewMockMetrics(ctrl)
+ metrics.SlidingWindowCounterMetrics =
slidingWindowCounterMetrics
url0, _ :=
common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ :=
common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
url2, _ :=
common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ //rtt
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
Times(1).
- Return(uint64(0), nil)
-
- m.EXPECT().
- GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
+ Return(float64(10), nil)
+ emaMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.RTT)).
+ Times(1).
+ Return(float64(0), utils.ErrMetricsNotFound)
+ //requests
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(10), nil)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Requests)).
+ Times(1).
+ Return(float64(0), utils.ErrMetricsNotFound)
+ //accepts
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url0),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
+ Times(1).
+ Return(float64(5), nil)
+ slidingWindowCounterMetrics.EXPECT().
+ GetMethodMetrics(gomock.Eq(url1),
gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.Accepts)).
Times(1).
- Return(uint64(0), metrics.ErrMetricsNotFound)
+ Return(float64(0), utils.ErrMetricsNotFound)
ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
diff --git a/cluster/metrics/constants.go b/cluster/metrics/constants.go
index 2bc6b4fea..557b3f7e6 100644
--- a/cluster/metrics/constants.go
+++ b/cluster/metrics/constants.go
@@ -19,4 +19,7 @@ package metrics
const (
HillClimbing = "hill-climbing"
+ Requests = "requests"
+ Accepts = "accepts"
+ RTT = "rtt"
)
diff --git a/cluster/metrics/local_metrics.go b/cluster/metrics/local_metrics.go
index 16c094cb8..a2d221b3f 100644
--- a/cluster/metrics/local_metrics.go
+++ b/cluster/metrics/local_metrics.go
@@ -18,18 +18,28 @@
package metrics
import (
- "fmt"
"sync"
)
import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/metrics/rolling"
+ "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
"dubbo.apache.org/dubbo-go/v3/common"
)
-var LocalMetrics Metrics
+var (
+ LocalMetrics Metrics
+ EMAMetrics rolling.Metrics
+ SlidingWindowCounterMetrics rolling.Metrics
+)
func init() {
LocalMetrics = newLocalMetrics()
+ EMAMetrics = rolling.NewEMAMetrics(rolling.EMAOpts{Alpha: 0.75})
+ SlidingWindowCounterMetrics =
rolling.NewSlidingWindowCounterMetrics(rolling.SlidingWindowCounterOpts{
+ Size: 10,
+ BucketDuration: 50000000,
+ })
}
var _ Metrics = (*localMetrics)(nil)
@@ -50,17 +60,17 @@ func newLocalMetrics() *localMetrics {
func (m *localMetrics) GetMethodMetrics(url *common.URL, methodName, key
string) (interface{}, error) {
m.lock.RLock()
defer m.lock.RUnlock()
- metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url),
getInvokerKey(url), methodName, key)
+ metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
if metrics, ok := m.metrics[metricsKey]; ok {
return metrics, nil
}
- return nil, ErrMetricsNotFound
+ return nil, utils.ErrMetricsNotFound
}
func (m *localMetrics) SetMethodMetrics(url *common.URL, methodName, key
string, value interface{}) error {
m.lock.Lock()
defer m.lock.Unlock()
- metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url),
getInvokerKey(url), methodName, key)
+ metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
m.metrics[metricsKey] = value
return nil
}
diff --git a/cluster/metrics/metrics.go b/cluster/metrics/metrics.go
index 0f61dcf1c..7d8240cc2 100644
--- a/cluster/metrics/metrics.go
+++ b/cluster/metrics/metrics.go
@@ -17,18 +17,10 @@
package metrics
-import (
- "github.com/pkg/errors"
-)
-
import (
"dubbo.apache.org/dubbo-go/v3/common"
)
-var (
- ErrMetricsNotFound = errors.New("metrics not found")
-)
-
type Metrics interface {
// GetMethodMetrics returns method-level metrics, the format of key is
"{instance key}.{invoker key}.{method key}.{key}"
// url is invoker's url, which contains information about instance and
invoker.
diff --git a/cluster/metrics/utils.go b/cluster/metrics/rolling/ema.go
similarity index 59%
copy from cluster/metrics/utils.go
copy to cluster/metrics/rolling/ema.go
index 65cc9e249..e0219139f 100644
--- a/cluster/metrics/utils.go
+++ b/cluster/metrics/rolling/ema.go
@@ -15,20 +15,42 @@
* limitations under the License.
*/
-package metrics
+package rolling
import (
- "fmt"
+ "sync"
)
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
-)
+// EMA is a struct implemented Exponential Moving Average.
+// val = old * (1 - alpha) + new * alpha
+type EMA struct {
+ mu sync.RWMutex
+ alpha float64
+ val float64
+}
+
+type EMAOpts struct {
+ Alpha float64
+}
-func getInvokerKey(url *common.URL) string {
- return url.Path
+// NewEMA creates a new EMA based on the given EMAOpts.
+func NewEMA(opts EMAOpts) *EMA {
+ return &EMA{
+ alpha: opts.Alpha,
+ val: 0,
+ }
}
-func getInstanceKey(url *common.URL) string {
- return fmt.Sprintf("%s:%s", url.Ip, url.Port)
+func (e *EMA) Append(v float64) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ e.val = e.val*(1-e.alpha) + v*e.alpha
+}
+
+func (e *EMA) Value() float64 {
+ e.mu.RLock()
+ defer e.mu.RUnlock()
+
+ return e.val
}
diff --git a/cluster/metrics/rolling/ema_metrics.go
b/cluster/metrics/rolling/ema_metrics.go
new file mode 100644
index 000000000..f0e69186c
--- /dev/null
+++ b/cluster/metrics/rolling/ema_metrics.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rolling
+
+import (
+ "sync"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+var _ Metrics = (*EMAMetrics)(nil)
+
+type EMAMetrics struct {
+ opts EMAOpts
+ metrics sync.Map
+}
+
+func NewEMAMetrics(opts EMAOpts) *EMAMetrics {
+ return &EMAMetrics{
+ opts: opts,
+ }
+}
+
+func (m *EMAMetrics) GetMethodMetrics(url *common.URL, methodName, key string)
(float64, error) {
+ metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+ if metrics, ok := m.metrics.Load(metricsKey); ok {
+ return metrics.(*EMA).Value(), nil
+ }
+ return 0, utils.ErrMetricsNotFound
+}
+
+func (m *EMAMetrics) AppendMethodMetrics(url *common.URL, methodName, key
string, val float64) error {
+ metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+ if metrics, ok := m.metrics.Load(metricsKey); ok {
+ metrics.(*EMA).Append(val)
+ } else {
+ metrics, _ = m.metrics.LoadOrStore(metricsKey, NewEMA(m.opts))
+ metrics.(*EMA).Append(val)
+ }
+ return nil
+}
diff --git a/cluster/metrics/utils.go
b/cluster/metrics/rolling/rolling_metrics.go
similarity index 64%
copy from cluster/metrics/utils.go
copy to cluster/metrics/rolling/rolling_metrics.go
index 65cc9e249..ae7bf39ff 100644
--- a/cluster/metrics/utils.go
+++ b/cluster/metrics/rolling/rolling_metrics.go
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package metrics
-
-import (
- "fmt"
-)
+package rolling
import (
"dubbo.apache.org/dubbo-go/v3/common"
)
-func getInvokerKey(url *common.URL) string {
- return url.Path
-}
-
-func getInstanceKey(url *common.URL) string {
- return fmt.Sprintf("%s:%s", url.Ip, url.Port)
+type Metrics interface {
+ // GetMethodMetrics returns method-level metrics, the format of key is
"{instance key}.{invoker key}.{method key}.{key}"
+ // url is invoker's url, which contains information about instance and
invoker.
+ // methodName is the method name.
+ // key is the key of the metrics.
+ GetMethodMetrics(url *common.URL, methodName, key string) (float64,
error)
+ AppendMethodMetrics(url *common.URL, methodName, key string, value
float64) error
}
diff --git a/cluster/metrics/rolling/rolling_metrics_mock.go
b/cluster/metrics/rolling/rolling_metrics_mock.go
new file mode 100644
index 000000000..7bf07b08d
--- /dev/null
+++ b/cluster/metrics/rolling/rolling_metrics_mock.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: ./rolling_metrics.go
+
+// Package rolling is a generated GoMock package.
+package rolling
+
+import (
+ reflect "reflect"
+)
+
+import (
+ gomock "github.com/golang/mock/gomock"
+)
+
+import (
+ common "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+// MockMetrics is a mock of Metrics interface.
+type MockMetrics struct {
+ ctrl *gomock.Controller
+ recorder *MockMetricsMockRecorder
+}
+
+// MockMetricsMockRecorder is the mock recorder for MockMetrics.
+type MockMetricsMockRecorder struct {
+ mock *MockMetrics
+}
+
+// NewMockMetrics creates a new mock instance.
+func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics {
+ mock := &MockMetrics{ctrl: ctrl}
+ mock.recorder = &MockMetricsMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder {
+ return m.recorder
+}
+
+// AppendMethodMetrics mocks base method.
+func (m *MockMetrics) AppendMethodMetrics(url *common.URL, methodName, key
string, value float64) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "AppendMethodMetrics", url, methodName, key,
value)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// AppendMethodMetrics indicates an expected call of AppendMethodMetrics.
+func (mr *MockMetricsMockRecorder) AppendMethodMetrics(url, methodName, key,
value interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"AppendMethodMetrics", reflect.TypeOf((*MockMetrics)(nil).AppendMethodMetrics),
url, methodName, key, value)
+}
+
+// GetMethodMetrics mocks base method.
+func (m *MockMetrics) GetMethodMetrics(url *common.URL, methodName, key
string) (float64, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GetMethodMetrics", url, methodName, key)
+ ret0, _ := ret[0].(float64)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// GetMethodMetrics indicates an expected call of GetMethodMetrics.
+func (mr *MockMetricsMockRecorder) GetMethodMetrics(url, methodName, key
interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"GetMethodMetrics", reflect.TypeOf((*MockMetrics)(nil).GetMethodMetrics), url,
methodName, key)
+}
diff --git a/cluster/metrics/rolling/sliding_window_counter.go
b/cluster/metrics/rolling/sliding_window_counter.go
new file mode 100644
index 000000000..ef8d7c673
--- /dev/null
+++ b/cluster/metrics/rolling/sliding_window_counter.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rolling
+
+import (
+ "sync"
+ "time"
+)
+
+// SlidingWindowCounter is a policy for ring window based on time duration.
+// SlidingWindowCounter moves bucket offset with time duration.
+// e.g. If the last point is appended one bucket duration ago,
+// SlidingWindowCounter will increment current offset.
+type SlidingWindowCounter struct {
+ size int
+ mu sync.RWMutex
+ buckets []float64
+ count float64
+ offset int
+ bucketDuration time.Duration
+ lastAppendTime time.Time
+}
+
+// SlidingWindowCounterOpts contains the arguments for creating
SlidingWindowCounter.
+type SlidingWindowCounterOpts struct {
+ Size int
+ BucketDuration time.Duration
+}
+
+// NewSlidingWindowCounter creates a new SlidingWindowCounter based on the
given window and SlidingWindowCounterOpts.
+func NewSlidingWindowCounter(opts SlidingWindowCounterOpts)
*SlidingWindowCounter {
+ buckets := make([]float64, opts.Size)
+
+ return &SlidingWindowCounter{
+ size: opts.Size,
+ offset: 0,
+ buckets: buckets,
+ bucketDuration: opts.BucketDuration,
+ lastAppendTime: time.Now(),
+ }
+}
+
+func (c *SlidingWindowCounter) timespan() int {
+ v := int(time.Since(c.lastAppendTime) / c.bucketDuration)
+ if v > -1 { // maybe time backwards
+ return v
+ }
+ return c.size
+}
+
+func (c *SlidingWindowCounter) Append(val float64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ // move offset
+ timespan := c.timespan()
+ if timespan > 0 {
+ start := (c.offset + 1) % c.size
+ end := (c.offset + timespan) % c.size
+ // reset the expired buckets
+ c.ResetBuckets(start, timespan)
+ c.offset = end
+ c.lastAppendTime = c.lastAppendTime.Add(time.Duration(timespan
* int(c.bucketDuration)))
+ }
+
+ c.buckets[c.offset] += val
+ c.count += val
+}
+
+func (c *SlidingWindowCounter) Value() float64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ return c.count
+}
+
+// ResetBucket empties the bucket based on the given offset.
+func (c *SlidingWindowCounter) ResetBucket(offset int) {
+ c.count -= c.buckets[offset%c.size]
+ c.buckets[offset%c.size] = 0
+}
+
+// ResetBuckets empties the buckets based on the given offsets.
+func (c *SlidingWindowCounter) ResetBuckets(offset int, count int) {
+ if count > c.size {
+ count = c.size
+ }
+ for i := 0; i < count; i++ {
+ c.ResetBucket(offset + i)
+ }
+}
diff --git a/cluster/metrics/rolling/sliding_window_counter_metrics.go
b/cluster/metrics/rolling/sliding_window_counter_metrics.go
new file mode 100644
index 000000000..87b4b2c3d
--- /dev/null
+++ b/cluster/metrics/rolling/sliding_window_counter_metrics.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rolling
+
+import (
+ "sync"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/metrics/utils"
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+var _ Metrics = (*SlidingWindowCounterMetrics)(nil)
+
+type SlidingWindowCounterMetrics struct {
+ opts SlidingWindowCounterOpts
+ metrics sync.Map
+}
+
+func NewSlidingWindowCounterMetrics(opts SlidingWindowCounterOpts)
*SlidingWindowCounterMetrics {
+ return &SlidingWindowCounterMetrics{
+ opts: opts,
+ }
+}
+
+func (m *SlidingWindowCounterMetrics) GetMethodMetrics(url *common.URL,
methodName, key string) (float64, error) {
+ metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+ if metrics, ok := m.metrics.Load(metricsKey); ok {
+ return metrics.(*SlidingWindowCounter).Value(), nil
+ }
+ return 0, utils.ErrMetricsNotFound
+}
+
+func (m *SlidingWindowCounterMetrics) AppendMethodMetrics(url *common.URL,
methodName, key string, val float64) error {
+ metricsKey := utils.GetMethodMetricsKey(url, methodName, key)
+ if metrics, ok := m.metrics.Load(metricsKey); ok {
+ metrics.(*SlidingWindowCounter).Append(val)
+ } else {
+ metrics, _ = m.metrics.LoadOrStore(metricsKey,
NewSlidingWindowCounter(m.opts))
+ metrics.(*SlidingWindowCounter).Append(val)
+ }
+ return nil
+}
diff --git a/cluster/metrics/utils.go b/cluster/metrics/utils/utils.go
similarity index 53%
rename from cluster/metrics/utils.go
rename to cluster/metrics/utils/utils.go
index 65cc9e249..e84312b17 100644
--- a/cluster/metrics/utils.go
+++ b/cluster/metrics/utils/utils.go
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package metrics
+package utils
import (
+ "errors"
"fmt"
)
@@ -25,10 +26,50 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
)
-func getInvokerKey(url *common.URL) string {
+var (
+ ErrMetricsNotFound = errors.New("metrics not found")
+)
+
+func GetInvokerKey(url *common.URL) string {
return url.Path
}
-func getInstanceKey(url *common.URL) string {
+func GetInstanceKey(url *common.URL) string {
return fmt.Sprintf("%s:%s", url.Ip, url.Port)
}
+
+func ToFloat64(i interface{}) float64 {
+ if i == nil {
+ return 0
+ }
+ switch s := i.(type) {
+ case float64:
+ return s
+ case float32:
+ return float64(s)
+ case int64:
+ return float64(s)
+ case int32:
+ return float64(s)
+ case int16:
+ return float64(s)
+ case int8:
+ return float64(s)
+ case uint:
+ return float64(s)
+ case uint64:
+ return float64(s)
+ case uint32:
+ return float64(s)
+ case uint16:
+ return float64(s)
+ case uint8:
+ return float64(s)
+ default:
+ return 0
+ }
+}
+
+func GetMethodMetricsKey(url *common.URL, methodName, key string) string {
+ return fmt.Sprintf("%s.%s.%s.%s", GetInstanceKey(url),
GetInvokerKey(url), methodName, key)
+}
diff --git a/cluster/utils/adaptivesvc.go b/cluster/utils/adaptivesvc.go
index e92f549ec..04edec20b 100644
--- a/cluster/utils/adaptivesvc.go
+++ b/cluster/utils/adaptivesvc.go
@@ -18,6 +18,8 @@
package utils
import (
+ "context"
+ "errors"
"fmt"
"strings"
)
@@ -44,3 +46,10 @@ func IsAdaptiveServiceFailed(err error) bool {
}
return strings.HasPrefix(err.Error(),
adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
}
+
+func IsDeadlineExceeded(err error) bool {
+ if err == nil {
+ return false
+ }
+ return errors.Is(err, context.DeadlineExceeded)
+}