This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab03aa19 feat(collector): clean up expired tables during hotspot
detection (#2369)
3ab03aa19 is described below
commit 3ab03aa196baf344470391fa149675cb9b2b73f0
Author: Dan Wang <[email protected]>
AuthorDate: Sat Mar 21 00:08:20 2026 +0800
feat(collector): clean up expired tables during hotspot detection (#2369)
https://github.com/apache/incubator-pegasus/issues/2358
Currently, when collecting hotspot-related metrics, the Go collector only
adds
new tables. If a large number of tables are deleted, the collector may
still retain
those deleted tables in memory, leading to excessive memory usage.
This PR introduces a table cleanup mechanism: if no metric data has been
collected
for a table over a prolonged period, the table will be removed to release
the memory
it occupies.
---
collector/config.yml | 33 ++++++++-------
collector/go.mod | 2 +-
collector/go.sum | 4 +-
collector/hotspot/partition_detector.go | 75 ++++++++++++++++++++++++++++++---
4 files changed, 90 insertions(+), 24 deletions(-)
diff --git a/collector/config.yml b/collector/config.yml
index d489ab0b7..8847629dd 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -16,7 +16,7 @@
# under the License.
# the cluster that this collector is binding
-cluster_name : "onebox"
+cluster_name: "onebox"
# the meta server addresses of the cluster.
meta_servers:
@@ -25,33 +25,34 @@ meta_servers:
- 127.0.0.1:34603
# local server port
-port : 34101
+port: 34101
metrics:
# use falcon as monitoring system.
- sink : falcon
- report_interval : 10s
+ sink: falcon
+ report_interval: 10s
prometheus:
# the exposed port for prometheus exposer
- exposer_port : 1111
+ exposer_port: 1111
falcon_agent:
# the host IP of falcon agent
- host : "127.0.0.1"
- port : 1988
- http_path : "/v1/push"
+ host: "127.0.0.1"
+ port: 1988
+ http_path: "/v1/push"
availability_detect:
- table_name : test
- partition_count : 16
- max_replica_count : 3
+ table_name: test
+ partition_count: 16
+ max_replica_count: 3
hotspot:
- rpc_timeout : 5s
- partition_detect_interval : 30s
- pull_metrics_timeout : 5s
- sample_metrics_interval : 10s
- max_sample_size : 128
+ retention_period: 24h
+ rpc_timeout: 5s
+ partition_detect_interval: 30s
+ pull_metrics_timeout: 5s
+ sample_metrics_interval: 10s
+ max_sample_size: 128
hotspot_partition_min_score: 3
hotspot_partition_min_qps: 100
diff --git a/collector/go.mod b/collector/go.mod
index 5aabb0b16..58aa51cf8 100644
--- a/collector/go.mod
+++ b/collector/go.mod
@@ -20,7 +20,7 @@ module github.com/apache/incubator-pegasus/collector
go 1.18
require (
- github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a
+ github.com/apache/incubator-pegasus/go-client
v0.0.0-20260211095029-022854b0259f
github.com/gammazero/deque v1.0.0
github.com/kataras/iris/v12 v12.2.0
github.com/prometheus/client_golang v1.18.0
diff --git a/collector/go.sum b/collector/go.sum
index 95ebed26d..59e6276e4 100644
--- a/collector/go.sum
+++ b/collector/go.sum
@@ -35,8 +35,8 @@ github.com/alecthomas/template
v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andybalholm/brotli v1.0.5
h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod
h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
-github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a
h1:Vqws5uoQ/ibw4QcnDHdXIleiGunC1QmZaMCrJN0znEk=
-github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a/go.mod
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
+github.com/apache/incubator-pegasus/go-client
v0.0.0-20260211095029-022854b0259f
h1:Q9jSLZZCsD8tdU8h+qFe6PN5DPqWfiezkfK/8l16i7Y=
+github.com/apache/incubator-pegasus/go-client
v0.0.0-20260211095029-022854b0259f/go.mod
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
github.com/apache/thrift v0.13.0
h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
diff --git a/collector/hotspot/partition_detector.go
b/collector/hotspot/partition_detector.go
index 8e0e0a8be..1a8c6277f 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -42,6 +42,7 @@ type PartitionDetector interface {
type PartitionDetectorConfig struct {
MetaServers []string
+ RetentionPeriod time.Duration
RpcTimeout time.Duration
DetectInterval time.Duration
PullMetricsTimeout time.Duration
@@ -54,6 +55,7 @@ type PartitionDetectorConfig struct {
func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
return &PartitionDetectorConfig{
MetaServers: viper.GetStringSlice("meta_servers"),
+ RetentionPeriod:
viper.GetDuration("hotspot.retention_period"),
RpcTimeout:
viper.GetDuration("hotspot.rpc_timeout"),
DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
PullMetricsTimeout:
viper.GetDuration("hotspot.pull_metrics_timeout"),
@@ -69,6 +71,10 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig)
(PartitionDetector, erro
return nil, fmt.Errorf("MetaServers should not be empty")
}
+ if cfg.RetentionPeriod <= 0 {
+ return nil, fmt.Errorf("RetentionPeriod(%d) must be > 0",
cfg.RetentionPeriod)
+ }
+
if cfg.DetectInterval <= 0 {
return nil, fmt.Errorf("DetectInterval(%d) must be > 0",
cfg.DetectInterval)
}
@@ -111,6 +117,12 @@ type partitionDetectorImpl struct {
}
func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go d.checkExpiration(ctx, &wg)
+
ticker := time.NewTicker(d.cfg.DetectInterval)
defer ticker.Stop()
@@ -119,12 +131,50 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error
{
case <-ticker.C:
d.detect()
case <-tom.Dying():
+ cancel()
+ wg.Wait()
+
log.Info("Hotspot partition detector exited.")
return nil
}
}
}
+func (d *partitionDetectorImpl) checkExpiration(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+
+ ticker := time.NewTicker(d.cfg.RetentionPeriod)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ d.retireExpiredTables()
+
+ case <-ctx.Done():
+ log.Info("Expiration checker for hotspot exited.")
+ return
+ }
+ }
+}
+
+func (d *partitionDetectorImpl) retireExpiredTables() {
+ currentTimestampSeconds := time.Now().Unix()
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+
+ log.Info("check expired tables")
+
+ for key, analyzer := range d.analyzers {
+ if !analyzer.isExpired(currentTimestampSeconds) {
+ continue
+ }
+
+ delete(d.analyzers, key)
+ }
+}
+
func (d *partitionDetectorImpl) detect() {
appMap, err := d.aggregate()
if err != nil {
@@ -369,10 +419,8 @@ func calculateStats(
}
// Only primary replica of a partition will be counted.
- // TODO(wangdan): support Equal() for base.HostPort.
primary := stats.partitionConfigs[partitionID].HpPrimary
- if primary.GetHost() != node.HpNode.GetHost() ||
- primary.GetPort() != node.HpNode.GetPort() {
+ if !node.HpNode.Equal(primary) {
continue
}
@@ -439,6 +487,10 @@ func calculateHotspotStats(appMap appStatsMap)
map[partitionAnalyzerKey][]hotspo
func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
hotspotMap := calculateHotspotStats(appMap)
+ nowTime := time.Now()
+ expireTime := nowTime.Add(d.cfg.RetentionPeriod)
+ expireTimestampSeconds := expireTime.Unix()
+
d.mtx.Lock()
defer d.mtx.Unlock()
@@ -455,7 +507,7 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap)
{
d.analyzers[key] = analyzer
}
- analyzer.add(value)
+ analyzer.add(value, expireTimestampSeconds)
// Perform the analysis asynchronously.
go analyzer.analyse()
@@ -489,13 +541,26 @@ type partitionAnalyzer struct {
appID int32
partitionCount int32
mtx sync.RWMutex
+ expireTimestampSeconds int64
samples deque.Deque[[]hotspotPartitionStats] // Each
element is a sample of all partitions of the table
}
-func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
+func (a *partitionAnalyzer) isExpired(currentTimestampSeconds int64) bool {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ return currentTimestampSeconds >= a.expireTimestampSeconds
+}
+
+func (a *partitionAnalyzer) add(
+ sample []hotspotPartitionStats,
+ expireTimestampSeconds int64,
+) {
a.mtx.Lock()
defer a.mtx.Unlock()
+ a.expireTimestampSeconds = expireTimestampSeconds
+
for a.samples.Len() >= a.maxSampleSize {
a.samples.PopFront()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]