This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f72883bc feat: wrapper sliding window with custom aggregator (#2358)
5f72883bc is described below

commit 5f72883bce073b0697a278126edcd6ec4b0a6dc0
Author: Wang Guan <[email protected]>
AuthorDate: Wed Jul 19 09:53:02 2023 +0800

    feat: wrapper sliding window with custom aggregator (#2358)
---
 metrics/util/aggregate/aggregator.go      | 133 ++++++++++++++++++++++++++++++
 metrics/util/aggregate/aggregator_test.go |  85 +++++++++++++++++++
 metrics/util/aggregate/quantile.go        |   1 +
 metrics/util/aggregate/sliding_window.go  |   4 +-
 4 files changed, 221 insertions(+), 2 deletions(-)

diff --git a/metrics/util/aggregate/aggregator.go 
b/metrics/util/aggregate/aggregator.go
new file mode 100644
index 000000000..0f31f3fc0
--- /dev/null
+++ b/metrics/util/aggregate/aggregator.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+       "math"
+       "sync"
+       "time"
+)
+
+// TimeWindowAggregator wrappers sliding window to aggregate data.
+//
+// It is concurrent-safe.
+// It uses custom struct aggregator to aggregate data.
+// The window is divided into several panes, and each pane's value is an 
aggregator instance.
+type TimeWindowAggregator struct {
+       window *slidingWindow
+       mux    sync.RWMutex
+}
+
+func NewTimeWindowAggregator(paneCount int, timeWindowSeconds int64) 
*TimeWindowAggregator {
+       return &TimeWindowAggregator{
+               window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
+       }
+}
+
+type Result struct {
+       Total float64
+       Min   float64
+       Max   float64
+       Avg   float64
+       Count uint64
+}
+
+// Result returns the aggregate result of the sliding window by aggregating 
all panes.
+func (t *TimeWindowAggregator) Result() *Result {
+       t.mux.RLock()
+       defer t.mux.RUnlock()
+
+       res := &Result{}
+
+       total := 0.0
+       count := uint64(0)
+       max := math.SmallestNonzeroFloat64
+       min := math.MaxFloat64
+
+       for _, v := range t.window.values(time.Now().UnixMilli()) {
+               total += v.(*aggregator).total
+               count += v.(*aggregator).count
+               max = math.Max(max, v.(*aggregator).max)
+               min = math.Min(min, v.(*aggregator).min)
+       }
+
+       if count > 0 {
+               res.Avg = total / float64(count)
+               res.Count = count
+               res.Total = total
+               res.Max = max
+               res.Min = min
+       }
+
+       return res
+}
+
+// Add adds a value to the sliding window's current pane.
+func (t *TimeWindowAggregator) Add(v float64) {
+       t.mux.Lock()
+       defer t.mux.Unlock()
+
+       t.window.currentPane(time.Now().UnixMilli(), 
t.newEmptyValue).value.(*aggregator).add(v)
+}
+
+func (t *TimeWindowAggregator) newEmptyValue() interface{} {
+       return newAggregator()
+}
+
+// aggregator is a custom struct to aggregate data.
+//
+// It is NOT concurrent-safe.
+// It aggregates data by calculating the min, max, total and count.
+type aggregator struct {
+       min   float64
+       max   float64
+       total float64
+       count uint64
+}
+
+func newAggregator() *aggregator {
+       return &aggregator{
+               min:   math.MaxFloat64,
+               max:   math.SmallestNonzeroFloat64,
+               total: float64(0),
+               count: uint64(0),
+       }
+}
+
+func (a *aggregator) add(v float64) {
+       a.updateMin(v)
+       a.updateMax(v)
+       a.updateTotal(v)
+       a.updateCount()
+}
+
+func (a *aggregator) updateMin(v float64) {
+       a.min = math.Min(a.min, v)
+}
+
+func (a *aggregator) updateMax(v float64) {
+       a.max = math.Max(a.max, v)
+}
+
+func (a *aggregator) updateTotal(v float64) {
+       a.total += v
+}
+
+func (a *aggregator) updateCount() {
+       a.count++
+}
diff --git a/metrics/util/aggregate/aggregator_test.go 
b/metrics/util/aggregate/aggregator_test.go
new file mode 100644
index 000000000..d3fa4f644
--- /dev/null
+++ b/metrics/util/aggregate/aggregator_test.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+       "math/rand"
+       "reflect"
+       "sync"
+       "testing"
+)
+
+func TestTimeWindowAggregatorAddAndResult(t *testing.T) {
+       timeWindowAggregator := NewTimeWindowAggregator(10, 1)
+       timeWindowAggregator.Add(10)
+       timeWindowAggregator.Add(20)
+       timeWindowAggregator.Add(30)
+
+       tests := []struct {
+               name string
+               want *Result
+       }{
+               {
+                       name: "Result",
+                       want: &Result{
+                               Total: 60,
+                               Min:   10,
+                               Max:   30,
+                               Avg:   20,
+                               Count: 3,
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := timeWindowAggregator.Result(); 
!reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("Result() = %v, want %v", got, tt.want)
+                       }
+               })
+       }
+}
+
+func BenchmarkTimeWindowAggregatorAdd(b *testing.B) {
+       wg := sync.WaitGroup{}
+       tw := NewTimeWindowAggregator(10, 1)
+       for i := 0; i < b.N; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       tw.Add(rand.Float64() * 100)
+               }()
+       }
+       wg.Wait()
+}
+
+func BenchmarkTimeWindowAggregatorResult(b *testing.B) {
+       wg := sync.WaitGroup{}
+       tw := NewTimeWindowAggregator(10, 1)
+       for i := 0; i < b.N; i++ {
+               wg.Add(1)
+               go func() {
+                       tw.Add(rand.Float64() * 100)
+               }()
+               go func() {
+                       defer wg.Done()
+                       tw.Result()
+               }()
+       }
+       wg.Wait()
+}
diff --git a/metrics/util/aggregate/quantile.go 
b/metrics/util/aggregate/quantile.go
index 1a78ba58c..c5aa4fd37 100644
--- a/metrics/util/aggregate/quantile.go
+++ b/metrics/util/aggregate/quantile.go
@@ -28,6 +28,7 @@ import (
 
 // TimeWindowQuantile wrappers sliding window around T-Digest.
 //
+// It is concurrent safe.
 // It uses T-Digest algorithm to calculate quantile.
 // The window is divided into several panes, and each pane's value is a 
TDigest instance.
 type TimeWindowQuantile struct {
diff --git a/metrics/util/aggregate/sliding_window.go 
b/metrics/util/aggregate/sliding_window.go
index feda9219e..42e2f083a 100644
--- a/metrics/util/aggregate/sliding_window.go
+++ b/metrics/util/aggregate/sliding_window.go
@@ -19,7 +19,7 @@ package aggregate
 
 // SlidingWindow adopts sliding window algorithm for statistics.
 //
-// It is not thread-safe.
+// It is NOT concurrent-safe.
 // A window contains paneCount panes.
 // intervalInMs = paneCount * paneIntervalInMs.
 type slidingWindow struct {
@@ -61,7 +61,7 @@ func (s *slidingWindow) isPaneDeprecated(pane *pane, 
timeMillis int64) bool {
        return timeMillis-pane.startInMs > s.intervalInMs
 }
 
-// currentPane get the pane at the specified timestamp in milliseconds.
+// currentPane get the pane at the specified timestamp or create a new one if 
the pane is deprecated.
 func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() 
interface{}) *pane {
        if timeMillis < 0 {
                return nil

Reply via email to