This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8cf5ed742b [INLONG-11703][SDK] Improve the cleanExpiredConnTicker
select logic in the connpool of the Golang SDK (#11704)
8cf5ed742b is described below
commit 8cf5ed742b18ddb2ebbfcc5517cbbaff62b594b5
Author: gunli <[email protected]>
AuthorDate: Wed Jan 22 14:41:29 2025 +0800
[INLONG-11703][SDK] Improve the cleanExpiredConnTicker select logic in the
connpool of the Golang SDK (#11704)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/connpool/connpool.go | 30 +++++++---------------
1 file changed, 9 insertions(+), 21 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index 6c28e10d53..5588091dc7 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -144,7 +144,7 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint,
size int,
}
// starts a backbround task, do rebalancing and recovering periodically
- go pool.recoverAndRebalance()
+ go pool.innerWork()
return pool, nil
}
@@ -527,8 +527,8 @@ func (p *connPool) markUnavailable(ep string) {
p.retryCounts.Store(ep, 0)
}
-// recoverAndRebalance recovers the down endpoint and rebalaces the conns
periodically
-func (p *connPool) recoverAndRebalance() {
+// innerWork recovers the down endpoint, rebalaces the conns and cleans the
expired conns periodically
+func (p *connPool) innerWork() {
// server failure is a low-probability event, so there's basically no
endpoint need to recover, a higher frequency is also acceptable
recoverTicker := time.NewTicker(10 * time.Second)
defer recoverTicker.Stop()
@@ -540,15 +540,12 @@ func (p *connPool) recoverAndRebalance() {
defer reBalanceTicker.Stop()
// clean expired conn every minute
- var cleanExpiredConnTicker *time.Ticker
+ var cleanExpiredConnChan <-chan time.Time
if p.maxConnLifetime > 0 {
- cleanExpiredConnTicker = time.NewTicker(1 * time.Minute)
+ cleanExpiredConnTicker := time.NewTicker(1 * time.Minute)
+ defer cleanExpiredConnTicker.Stop()
+ cleanExpiredConnChan = cleanExpiredConnTicker.C
}
- defer func() {
- if cleanExpiredConnTicker != nil {
- cleanExpiredConnTicker.Stop()
- }
- }()
for {
select {
@@ -564,17 +561,8 @@ func (p *connPool) recoverAndRebalance() {
p.rebalance()
case <-p.closeCh:
return
- default:
- if cleanExpiredConnTicker != nil {
- select {
- case <-cleanExpiredConnTicker.C:
- p.cleanExpiredConns()
- default:
- time.Sleep(time.Second)
- }
- } else {
- time.Sleep(time.Second)
- }
+ case <-cleanExpiredConnChan:
+ p.cleanExpiredConns()
}
}
}