This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 38fa72ecf feat(collector): introduce the hotspot detector into
go-collector (#1943)
38fa72ecf is described below
commit 38fa72ecf767ac141954d00f34ecb0b8cd15222b
Author: Dan Wang <[email protected]>
AuthorDate: Thu Mar 14 11:46:55 2024 +0800
feat(collector): introduce the hotspot detector into go-collector (#1943)
---
collector/avail/detector.go | 4 +--
collector/config.yml | 3 ++
.../hotspot/{algo.go => partition_detector.go} | 40 ++++++++++++++++++++++
collector/main.go | 27 ++++++++++++---
collector/metrics/metric_collector.go | 4 +--
5 files changed, 69 insertions(+), 9 deletions(-)
diff --git a/collector/avail/detector.go b/collector/avail/detector.go
index 2b93da9f2..d6ddfc27a 100644
--- a/collector/avail/detector.go
+++ b/collector/avail/detector.go
@@ -34,7 +34,7 @@ import (
// Detector periodically checks the service availability of the Pegasus
cluster.
type Detector interface {
- Start(tom *tomb.Tomb) error
+ Run(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
@@ -96,7 +96,7 @@ type pegasusDetector struct {
partitionCount int
}
-func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
+func (d *pegasusDetector) Run(tom *tomb.Tomb) error {
var err error
// Open the detect table.
d.detectTable, err = d.client.OpenTable(context.Background(),
d.detectTableName)
diff --git a/collector/config.yml b/collector/config.yml
index 1ff2d10e9..5075cf8d5 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -44,3 +44,6 @@ falcon_agent:
availablity_detect:
table_name : test
+
+hotspot:
+ partition_detect_interval : 10s
diff --git a/collector/hotspot/algo.go b/collector/hotspot/partition_detector.go
similarity index 54%
rename from collector/hotspot/algo.go
rename to collector/hotspot/partition_detector.go
index 6b24419cf..a1b55ef5a 100644
--- a/collector/hotspot/algo.go
+++ b/collector/hotspot/partition_detector.go
@@ -16,3 +16,43 @@
// under the License.
package hotspot
+
+import (
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ "gopkg.in/tomb.v2"
+)
+
+type PartitionDetector interface {
+ Run(tom *tomb.Tomb) error
+}
+
+type PartitionDetectorConfig struct {
+ DetectInterval time.Duration
+}
+
+func NewPartitionDetector(conf PartitionDetectorConfig) PartitionDetector {
+ return &partitionDetector{
+ detectInterval: conf.DetectInterval,
+ }
+}
+
+type partitionDetector struct {
+ detectInterval time.Duration
+}
+
+func (d *partitionDetector) Run(tom *tomb.Tomb) error {
+ for {
+ select {
+ case <-time.After(d.detectInterval):
+ d.detect()
+ case <-tom.Dying():
+ log.Info("Hotspot partition detector exited.")
+ return nil
+ }
+ }
+}
+
+func (d *partitionDetector) detect() {
+}
diff --git a/collector/main.go b/collector/main.go
index efc2fc98b..7936036d5 100644
--- a/collector/main.go
+++ b/collector/main.go
@@ -27,6 +27,7 @@ import (
"syscall"
"github.com/apache/incubator-pegasus/collector/avail"
+ "github.com/apache/incubator-pegasus/collector/hotspot"
"github.com/apache/incubator-pegasus/collector/metrics"
"github.com/apache/incubator-pegasus/collector/webui"
"github.com/prometheus/client_golang/prometheus"
@@ -87,18 +88,34 @@ func main() {
tom := &tomb.Tomb{}
setupSignalHandler(func() {
- tom.Kill(errors.New("collector terminates")) // kill other
goroutines
+ tom.Kill(errors.New("Collector terminates")) // kill other
goroutines
})
+
tom.Go(func() error {
// Set detect inteverl and detect timeout 10s.
- return avail.NewDetector(10000000000, 10000000000,
16).Start(tom)
+ return avail.NewDetector(10000000000, 10000000000, 16).Run(tom)
})
+
tom.Go(func() error {
- return metrics.NewMetaServerMetricCollector().Start(tom)
+ return metrics.NewMetaServerMetricCollector().Run(tom)
})
+
tom.Go(func() error {
- return metrics.NewReplicaServerMetricCollector().Start(tom)
+ return metrics.NewReplicaServerMetricCollector().Run(tom)
})
- <-tom.Dead() // gracefully wait until all goroutines dead
+ tom.Go(func() error {
+ conf := hotspot.PartitionDetectorConfig{
+ DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
+ }
+ return hotspot.NewPartitionDetector(conf).Run(tom)
+ })
+
+ err := tom.Wait()
+ if err != nil {
+ log.Error("Collector exited abnormally:", err)
+ return
+ }
+
+ log.Info("Collector exited normally.")
}
diff --git a/collector/metrics/metric_collector.go
b/collector/metrics/metric_collector.go
index 9e6f57bbb..6f7fcee78 100644
--- a/collector/metrics/metric_collector.go
+++ b/collector/metrics/metric_collector.go
@@ -56,7 +56,7 @@ var SummaryMetricsMap map[string]prometheus.Summary
var TableNameByID map[string]string
type MetricCollector interface {
- Start(tom *tomb.Tomb) error
+ Run(tom *tomb.Tomb) error
}
func NewMetricCollector(
@@ -79,7 +79,7 @@ type Collector struct {
role string
}
-func (collector *Collector) Start(tom *tomb.Tomb) error {
+func (collector *Collector) Run(tom *tomb.Tomb) error {
ticker := time.NewTicker(collector.detectInterval)
for {
select {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]