This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 40ce4cb23b [INLONG-12119][SDK] Add connection timeout and tolerate 
partial endpoint failures during client initialization in DataProxy Go SDK 
(#12122)
40ce4cb23b is described below

commit 40ce4cb23bd9804d0382377a31f5d1835ab7bead
Author: yfsn666 <[email protected]>
AuthorDate: Mon May 18 19:25:56 2026 +0800

    [INLONG-12119][SDK] Add connection timeout and tolerate partial endpoint 
failures during client initialization in DataProxy Go SDK (#12122)
---
 .../dataproxy-sdk-golang/connpool/connpool.go      | 36 +++++++++++++---------
 .../dataproxy-sdk-golang/dataproxy/client.go       |  8 ++++-
 2 files changed, 29 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index 5588091dc7..86898ed8c0 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -19,8 +19,10 @@ package connpool
 import (
        "context"
        "errors"
+       "fmt"
        "math"
        "runtime/debug"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -253,8 +255,8 @@ func (p *connPool) dialNewConn(ep string) (gnet.Conn, 
error) {
 func (p *connPool) initConns(count int) error {
        // create some conns and then put them back to the pool
        var wg sync.WaitGroup
-       conns := make(chan gnet.Conn, count)
-       errs := make(chan error, count)
+       var successCount atomic.Int32
+       errs := make(chan string, count)
 
        for i := 0; i < count; i++ {
                wg.Add(1)
@@ -262,28 +264,34 @@ func (p *connPool) initConns(count int) error {
                        defer wg.Done()
                        conn, err := p.newConn()
                        if err != nil {
-                               errs <- err
+                               errs <- err.Error()
                                return
                        }
-                       conns <- conn
+                       p.put(conn, nil, true)
+                       successCount.Add(1)
                }()
        }
 
        wg.Wait()
-       close(conns)
        close(errs)
 
-       for err := range errs {
-               if err != nil {
-                       for conn := range conns {
-                               _ = conn.Close()
-                       }
-                       return err
+       succeeded := int(successCount.Load())
+       if failed := count - succeeded; failed > 0 {
+               // aggregate identical errors to avoid spammy logs
+               agg := make(map[string]int, failed)
+               for e := range errs {
+                       agg[e]++
                }
+               items := make([]string, 0, len(agg))
+               for msg, n := range agg {
+                       items = append(items, fmt.Sprintf("%dx %s", n, msg))
+               }
+               p.log.Warnf("initConns partial failure, required=%d, 
succeeded=%d, failed=%d, errors=[%s]",
+                       count, succeeded, failed, strings.Join(items, "; "))
        }
-
-       for conn := range conns {
-               p.put(conn, nil, true)
+       // Only fail initialization when no endpoint is reachable at all
+       if succeeded == 0 {
+               return ErrNoAvailableEndpoint
        }
 
        return nil
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index 1046a10d01..f8833b25e3 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -21,6 +21,7 @@ import (
        "context"
        "errors"
        "math"
+       "net"
        "sync"
        "time"
 
@@ -213,7 +214,12 @@ func (c *client) initWorkers() error {
 }
 
 func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) {
-       return c.netClient.DialContext("tcp", addr, ctx)
+       // Use net.DialTimeout to bound the TCP handshake duration
+       raw, err := net.DialTimeout("tcp", addr, c.options.ConnTimeout)
+       if err != nil {
+               return nil, err
+       }
+       return c.netClient.EnrollContext(raw, ctx)
 }
 
 func (c *client) Send(ctx context.Context, msg Message) error {

Reply via email to