This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab03aa19 feat(collector): clean up expired tables during hotspot 
detection (#2369)
3ab03aa19 is described below

commit 3ab03aa196baf344470391fa149675cb9b2b73f0
Author: Dan Wang <[email protected]>
AuthorDate: Sat Mar 21 00:08:20 2026 +0800

    feat(collector): clean up expired tables during hotspot detection (#2369)
    
    https://github.com/apache/incubator-pegasus/issues/2358
    
    Currently, when collecting hotspot-related metrics, the Go collector only 
adds
    new tables. If a large number of tables are deleted, the collector may 
still retain
    those deleted tables in memory, leading to excessive memory usage.
    
    This PR introduces a table cleanup mechanism: if no metric data has been 
collected
    for a table over a prolonged period, the table will be removed to release 
the memory
    it occupies.
---
 collector/config.yml                    | 33 ++++++++-------
 collector/go.mod                        |  2 +-
 collector/go.sum                        |  4 +-
 collector/hotspot/partition_detector.go | 75 ++++++++++++++++++++++++++++++---
 4 files changed, 90 insertions(+), 24 deletions(-)

diff --git a/collector/config.yml b/collector/config.yml
index d489ab0b7..8847629dd 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -16,7 +16,7 @@
 # under the License.
 
 # the cluster that this collector is binding
-cluster_name : "onebox"
+cluster_name: "onebox"
 
 # the meta server addresses of the cluster.
 meta_servers:
@@ -25,33 +25,34 @@ meta_servers:
   - 127.0.0.1:34603
 
 # local server port
-port : 34101
+port: 34101
 
 metrics:
   # use falcon as monitoring system.
-  sink : falcon
-  report_interval : 10s
+  sink: falcon
+  report_interval: 10s
 
 prometheus:
   # the exposed port for prometheus exposer
-  exposer_port : 1111
+  exposer_port: 1111
 
 falcon_agent:
   # the host IP of falcon agent
-  host : "127.0.0.1"
-  port : 1988
-  http_path : "/v1/push"
+  host: "127.0.0.1"
+  port: 1988
+  http_path: "/v1/push"
 
 availability_detect:
-  table_name : test
-  partition_count : 16
-  max_replica_count : 3
+  table_name: test
+  partition_count: 16
+  max_replica_count: 3
 
 hotspot:
-  rpc_timeout : 5s
-  partition_detect_interval : 30s
-  pull_metrics_timeout : 5s
-  sample_metrics_interval : 10s
-  max_sample_size : 128
+  retention_period: 24h
+  rpc_timeout: 5s
+  partition_detect_interval: 30s
+  pull_metrics_timeout: 5s
+  sample_metrics_interval: 10s
+  max_sample_size: 128
   hotspot_partition_min_score: 3
   hotspot_partition_min_qps: 100
diff --git a/collector/go.mod b/collector/go.mod
index 5aabb0b16..58aa51cf8 100644
--- a/collector/go.mod
+++ b/collector/go.mod
@@ -20,7 +20,7 @@ module github.com/apache/incubator-pegasus/collector
 go 1.18
 
 require (
-       github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a
+       github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260211095029-022854b0259f
        github.com/gammazero/deque v1.0.0
        github.com/kataras/iris/v12 v12.2.0
        github.com/prometheus/client_golang v1.18.0
diff --git a/collector/go.sum b/collector/go.sum
index 95ebed26d..59e6276e4 100644
--- a/collector/go.sum
+++ b/collector/go.sum
@@ -35,8 +35,8 @@ github.com/alecthomas/template 
v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/andybalholm/brotli v1.0.5 
h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
 github.com/andybalholm/brotli v1.0.5/go.mod 
h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
-github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a 
h1:Vqws5uoQ/ibw4QcnDHdXIleiGunC1QmZaMCrJN0znEk=
-github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a/go.mod 
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
+github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260211095029-022854b0259f 
h1:Q9jSLZZCsD8tdU8h+qFe6PN5DPqWfiezkfK/8l16i7Y=
+github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260211095029-022854b0259f/go.mod 
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
 github.com/apache/thrift v0.13.0 
h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
 github.com/apache/thrift v0.13.0/go.mod 
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod 
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
diff --git a/collector/hotspot/partition_detector.go 
b/collector/hotspot/partition_detector.go
index 8e0e0a8be..1a8c6277f 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -42,6 +42,7 @@ type PartitionDetector interface {
 
 type PartitionDetectorConfig struct {
        MetaServers              []string
+       RetentionPeriod          time.Duration
        RpcTimeout               time.Duration
        DetectInterval           time.Duration
        PullMetricsTimeout       time.Duration
@@ -54,6 +55,7 @@ type PartitionDetectorConfig struct {
 func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
        return &PartitionDetectorConfig{
                MetaServers:              viper.GetStringSlice("meta_servers"),
+               RetentionPeriod:          
viper.GetDuration("hotspot.retention_period"),
                RpcTimeout:               
viper.GetDuration("hotspot.rpc_timeout"),
                DetectInterval:           
viper.GetDuration("hotspot.partition_detect_interval"),
                PullMetricsTimeout:       
viper.GetDuration("hotspot.pull_metrics_timeout"),
@@ -69,6 +71,10 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) 
(PartitionDetector, erro
                return nil, fmt.Errorf("MetaServers should not be empty")
        }
 
+       if cfg.RetentionPeriod <= 0 {
+               return nil, fmt.Errorf("RetentionPeriod(%d) must be > 0", 
cfg.RetentionPeriod)
+       }
+
        if cfg.DetectInterval <= 0 {
                return nil, fmt.Errorf("DetectInterval(%d) must be > 0", 
cfg.DetectInterval)
        }
@@ -111,6 +117,12 @@ type partitionDetectorImpl struct {
 }
 
 func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
+       ctx, cancel := context.WithCancel(context.Background())
+
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go d.checkExpiration(ctx, &wg)
+
        ticker := time.NewTicker(d.cfg.DetectInterval)
        defer ticker.Stop()
 
@@ -119,12 +131,50 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error 
{
                case <-ticker.C:
                        d.detect()
                case <-tom.Dying():
+                       cancel()
+                       wg.Wait()
+
                        log.Info("Hotspot partition detector exited.")
                        return nil
                }
        }
 }
 
+func (d *partitionDetectorImpl) checkExpiration(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+
+       ticker := time.NewTicker(d.cfg.RetentionPeriod)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       d.retireExpiredTables()
+
+               case <-ctx.Done():
+                       log.Info("Expiration checker for hotspot exited.")
+                       return
+               }
+       }
+}
+
+func (d *partitionDetectorImpl) retireExpiredTables() {
+       currentTimestampSeconds := time.Now().Unix()
+
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+
+       log.Info("check expired tables")
+
+       for key, analyzer := range d.analyzers {
+               if !analyzer.isExpired(currentTimestampSeconds) {
+                       continue
+               }
+
+               delete(d.analyzers, key)
+       }
+}
+
 func (d *partitionDetectorImpl) detect() {
        appMap, err := d.aggregate()
        if err != nil {
@@ -369,10 +419,8 @@ func calculateStats(
                }
 
                // Only primary replica of a partition will be counted.
-               // TODO(wangdan): support Equal() for base.HostPort.
                primary := stats.partitionConfigs[partitionID].HpPrimary
-               if primary.GetHost() != node.HpNode.GetHost() ||
-                       primary.GetPort() != node.HpNode.GetPort() {
+               if !node.HpNode.Equal(primary) {
                        continue
                }
 
@@ -439,6 +487,10 @@ func calculateHotspotStats(appMap appStatsMap) 
map[partitionAnalyzerKey][]hotspo
 func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
        hotspotMap := calculateHotspotStats(appMap)
 
+       nowTime := time.Now()
+       expireTime := nowTime.Add(d.cfg.RetentionPeriod)
+       expireTimestampSeconds := expireTime.Unix()
+
        d.mtx.Lock()
        defer d.mtx.Unlock()
 
@@ -455,7 +507,7 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap) 
{
                        d.analyzers[key] = analyzer
                }
 
-               analyzer.add(value)
+               analyzer.add(value, expireTimestampSeconds)
 
                // Perform the analysis asynchronously.
                go analyzer.analyse()
@@ -489,13 +541,26 @@ type partitionAnalyzer struct {
        appID                    int32
        partitionCount           int32
        mtx                      sync.RWMutex
+       expireTimestampSeconds   int64
        samples                  deque.Deque[[]hotspotPartitionStats] // Each 
element is a sample of all partitions of the table
 }
 
-func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
+func (a *partitionAnalyzer) isExpired(currentTimestampSeconds int64) bool {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       return currentTimestampSeconds >= a.expireTimestampSeconds
+}
+
+func (a *partitionAnalyzer) add(
+       sample []hotspotPartitionStats,
+       expireTimestampSeconds int64,
+) {
        a.mtx.Lock()
        defer a.mtx.Unlock()
 
+       a.expireTimestampSeconds = expireTimestampSeconds
+
        for a.samples.Len() >= a.maxSampleSize {
                a.samples.PopFront()
        }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to