This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cf5ed742b [INLONG-11703][SDK] Improve the cleanExpiredConnTicker 
select logic in the connpool of the Golang SDK (#11704)
8cf5ed742b is described below

commit 8cf5ed742b18ddb2ebbfcc5517cbbaff62b594b5
Author: gunli <[email protected]>
AuthorDate: Wed Jan 22 14:41:29 2025 +0800

    [INLONG-11703][SDK] Improve the cleanExpiredConnTicker select logic in the 
connpool of the Golang SDK (#11704)
    
    Co-authored-by: gunli <[email protected]>
---
 .../dataproxy-sdk-golang/connpool/connpool.go      | 30 +++++++---------------
 1 file changed, 9 insertions(+), 21 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index 6c28e10d53..5588091dc7 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -144,7 +144,7 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint, 
size int,
        }
 
        // starts a backbround task, do rebalancing and recovering periodically
-       go pool.recoverAndRebalance()
+       go pool.innerWork()
 
        return pool, nil
 }
@@ -527,8 +527,8 @@ func (p *connPool) markUnavailable(ep string) {
        p.retryCounts.Store(ep, 0)
 }
 
-// recoverAndRebalance recovers the down endpoint and rebalaces the conns 
periodically
-func (p *connPool) recoverAndRebalance() {
+// innerWork recovers the down endpoint, rebalaces the conns and cleans the 
expired conns periodically
+func (p *connPool) innerWork() {
        // server failure is a low-probability event, so there's basically no 
endpoint need to recover, a higher frequency is also acceptable
        recoverTicker := time.NewTicker(10 * time.Second)
        defer recoverTicker.Stop()
@@ -540,15 +540,12 @@ func (p *connPool) recoverAndRebalance() {
        defer reBalanceTicker.Stop()
 
        // clean expired conn every minute
-       var cleanExpiredConnTicker *time.Ticker
+       var cleanExpiredConnChan <-chan time.Time
        if p.maxConnLifetime > 0 {
-               cleanExpiredConnTicker = time.NewTicker(1 * time.Minute)
+               cleanExpiredConnTicker := time.NewTicker(1 * time.Minute)
+               defer cleanExpiredConnTicker.Stop()
+               cleanExpiredConnChan = cleanExpiredConnTicker.C
        }
-       defer func() {
-               if cleanExpiredConnTicker != nil {
-                       cleanExpiredConnTicker.Stop()
-               }
-       }()
 
        for {
                select {
@@ -564,17 +561,8 @@ func (p *connPool) recoverAndRebalance() {
                        p.rebalance()
                case <-p.closeCh:
                        return
-               default:
-                       if cleanExpiredConnTicker != nil {
-                               select {
-                               case <-cleanExpiredConnTicker.C:
-                                       p.cleanExpiredConns()
-                               default:
-                                       time.Sleep(time.Second)
-                               }
-                       } else {
-                               time.Sleep(time.Second)
-                       }
+               case <-cleanExpiredConnChan:
+                       p.cleanExpiredConns()
                }
        }
 }

Reply via email to