This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 40ce4cb23b [INLONG-12119][SDK] Add connection timeout and tolerate
partial endpoint failures during client initialization in DataProxy Go SDK
(#12122)
40ce4cb23b is described below
commit 40ce4cb23bd9804d0382377a31f5d1835ab7bead
Author: yfsn666 <[email protected]>
AuthorDate: Mon May 18 19:25:56 2026 +0800
[INLONG-12119][SDK] Add connection timeout and tolerate partial endpoint
failures during client initialization in DataProxy Go SDK (#12122)
---
.../dataproxy-sdk-golang/connpool/connpool.go | 36 +++++++++++++---------
.../dataproxy-sdk-golang/dataproxy/client.go | 8 ++++-
2 files changed, 29 insertions(+), 15 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index 5588091dc7..86898ed8c0 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -19,8 +19,10 @@ package connpool
import (
"context"
"errors"
+ "fmt"
"math"
"runtime/debug"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -253,8 +255,8 @@ func (p *connPool) dialNewConn(ep string) (gnet.Conn,
error) {
func (p *connPool) initConns(count int) error {
// create some conns and then put them back to the pool
var wg sync.WaitGroup
- conns := make(chan gnet.Conn, count)
- errs := make(chan error, count)
+ var successCount atomic.Int32
+ errs := make(chan string, count)
for i := 0; i < count; i++ {
wg.Add(1)
@@ -262,28 +264,34 @@ func (p *connPool) initConns(count int) error {
defer wg.Done()
conn, err := p.newConn()
if err != nil {
- errs <- err
+ errs <- err.Error()
return
}
- conns <- conn
+ p.put(conn, nil, true)
+ successCount.Add(1)
}()
}
wg.Wait()
- close(conns)
close(errs)
- for err := range errs {
- if err != nil {
- for conn := range conns {
- _ = conn.Close()
- }
- return err
+ succeeded := int(successCount.Load())
+ if failed := count - succeeded; failed > 0 {
+ // aggregate identical errors to avoid spammy logs
+ agg := make(map[string]int, failed)
+ for e := range errs {
+ agg[e]++
}
+ items := make([]string, 0, len(agg))
+ for msg, n := range agg {
+ items = append(items, fmt.Sprintf("%dx %s", n, msg))
+ }
+ p.log.Warnf("initConns partial failure, required=%d,
succeeded=%d, failed=%d, errors=[%s]",
+ count, succeeded, failed, strings.Join(items, "; "))
}
-
- for conn := range conns {
- p.put(conn, nil, true)
+ // Only fail initialization when no endpoint is reachable at all
+ if succeeded == 0 {
+ return ErrNoAvailableEndpoint
}
return nil
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index 1046a10d01..f8833b25e3 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -21,6 +21,7 @@ import (
"context"
"errors"
"math"
+ "net"
"sync"
"time"
@@ -213,7 +214,12 @@ func (c *client) initWorkers() error {
}
func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) {
- return c.netClient.DialContext("tcp", addr, ctx)
+ // Use net.DialTimeout to bound the TCP handshake duration
+ raw, err := net.DialTimeout("tcp", addr, c.options.ConnTimeout)
+ if err != nil {
+ return nil, err
+ }
+ return c.netClient.EnrollContext(raw, ctx)
}
func (c *client) Send(ctx context.Context, msg Message) error {