This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4307016a91 [INLONG-12121][SDK] Log remote server address on send 
timeout and server errors in DataProxy Go SDK (#12124)
4307016a91 is described below

commit 4307016a91304b4535c12c39c03b4915120a9635
Author: yfsn666 <[email protected]>
AuthorDate: Mon May 18 19:26:59 2026 +0800

    [INLONG-12121][SDK] Log remote server address on send timeout and server 
errors in DataProxy Go SDK (#12124)
---
 .../dataproxy-sdk-golang/connpool/connpool.go      |  5 ++--
 .../dataproxy-sdk-golang/dataproxy/request.go      | 33 +++++++++++-----------
 .../dataproxy-sdk-golang/dataproxy/worker.go       | 26 +++++++++++------
 3 files changed, 37 insertions(+), 27 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index 86898ed8c0..77b3eb75f7 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -575,7 +575,8 @@ func (p *connPool) innerWork() {
        }
 }
 
-func getRemoteAddr(conn gnet.Conn) string {
+// GetRemoteAddr returns the remote address of the connection safely
+func GetRemoteAddr(conn gnet.Conn) string {
        if conn == nil {
                return ""
        }
@@ -628,7 +629,7 @@ loop:
 
        // close the expired conn and append new conn with the same addr
        for _, expired := range expiredConns {
-               addr := getRemoteAddr(expired)
+               addr := GetRemoteAddr(expired)
                p.log.Debug("connection expired, close it, addr:", addr, ", 
err:", nil)
                CloseConn(expired, defaultConnCloseDelay)
                _ = p.appendNewConn(addr)
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index 82100ef25d..1eabd39e8a 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -86,22 +86,23 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte {
 
 type batchCallback func()
 type batchReq struct {
-       pool         *sync.Pool
-       workerID     string
-       batchID      string
-       groupID      string
-       streamID     string
-       dataReqs     []*sendDataReq
-       dataSize     int
-       batchTime    time.Time
-       lastSendTime time.Time
-       retries      int
-       bufferPool   bufferpool.BufferPool
-       bytePool     bufferpool.BytePool
-       buffer       *bytes.Buffer
-       callback     batchCallback
-       metrics      *metrics
-       addColumns   string
+       pool               *sync.Pool
+       workerID           string
+       batchID            string
+       groupID            string
+       streamID           string
+       dataReqs           []*sendDataReq
+       dataSize           int
+       batchTime          time.Time
+       lastSendTime       time.Time
+       lastSendServerAddr string
+       retries            int
+       bufferPool         bufferpool.BufferPool
+       bytePool           bufferpool.BytePool
+       buffer             *bytes.Buffer
+       callback           batchCallback
+       metrics            *metrics
+       addColumns         string
 }
 
 func (b *batchReq) append(req *sendDataReq) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 46c6cfde3c..02ad9cda73 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -30,6 +30,7 @@ import (
        "go.uber.org/atomic"
 
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
+       
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"
@@ -416,6 +417,10 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
        b.lastSendTime = time.Now()
        b.encode()
 
+       // record the remote server address at the moment the batch is 
dispatched
+       conn := w.getConn()
+       b.lastSendServerAddr = connpool.GetRemoteAddr(conn)
+
        // error callback
        onErr := func(c gnet.Conn, e error, inCallback bool) {
                defer func() {
@@ -427,7 +432,8 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
                }()
 
                w.metrics.incError(errConnWriteFailed.getStrCode())
-               w.log.Error("send batch failed, err: ", e, ", inCallback: ", 
inCallback, ", logNum:", len(b.dataReqs))
+               w.log.Error("send batch failed, err: ", e, ", inCallback: ", 
inCallback, ", logNum:", len(b.dataReqs),
+                       ", workerID:", w.index, ", batchID:", b.batchID, ", 
serverAddr:", b.lastSendServerAddr)
 
                // close already
                if w.getState() == stateClosed {
@@ -459,9 +465,8 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
        }
 
        // very important:'cause we use gnet, we must call AsyncWrite to send 
data in goroutines that are different from gnet.OnTraffic() callback
-       conn := w.getConn()
        if b.retries > 0 {
-               w.log.Debug("retry batch to conn:", conn.RemoteAddr(), ", 
workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
+               w.log.Debug("retry batch to conn:", b.lastSendServerAddr, ", 
workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
        }
        err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) 
error {
                if e != nil {
@@ -564,7 +569,8 @@ func (w *worker) handleSendTimeout() {
        for batchID, batch := range w.unackedBatches {
                if time.Since(batch.lastSendTime) > w.options.SendTimeout {
                        w.log.Warn("worker[", w.index, "] send timeout, resend 
it now:", batch.batchID, "batchID:", batchID,
-                               ",last send time:", 
batch.lastSendTime.UnixMilli(), ", now:", time.Now().UnixMilli(), "timeout 
option:", w.options.SendTimeout)
+                               ", last send time:", 
batch.lastSendTime.UnixMilli(), ", now:", time.Now().UnixMilli(),
+                               ", timeout option:", w.options.SendTimeout, ", 
serverAddr:", batch.lastSendServerAddr)
                        //
                        // w.retryBatches <- batch
                        w.backoffRetry(context.Background(), batch)
@@ -651,7 +657,7 @@ func (w *worker) handleRsp(rsp *batchRsp) {
                needSwitchConn, isRetryable := 
retryableServerErrorCodes[rsp.errCode]
                if needSwitchConn && w.client != nil {
                        w.log.Warn("server error detected, switching 
connection, errCode:", rsp.errCode,
-                               ", batchID:", batch.batchID)
+                               ", batchID:", batch.batchID, ", serverAddr:", 
batch.lastSendServerAddr)
                        w.updateConn(nil, nil)
                }
 
@@ -659,8 +665,8 @@ func (w *worker) handleRsp(rsp *batchRsp) {
                if w.options.RetryOnServerError && isRetryable && batch.retries 
< w.options.MaxRetries {
                        delete(w.unackedBatches, batchID)
 
-                       w.log.Warn("server error, will retry, errCode:", 
rsp.errCode,
-                               ", batchID:", batch.batchID, ", retries:", 
batch.retries)
+                       w.log.Warn("server error, will retry, errCode:", 
rsp.errCode, ", batchID:", batch.batchID,
+                               ", retries:", batch.retries, ", serverAddr:", 
batch.lastSendServerAddr)
 
                        w.backoffRetry(context.Background(), batch)
                        return
@@ -674,10 +680,12 @@ func (w *worker) handleRsp(rsp *batchRsp) {
                                ", batchID=" + rsp.batchID +
                                ", groupID=" + rsp.groupID +
                                ", streamID=" + rsp.streamID +
-                               ", dt=" + rsp.dt,
+                               ", dt=" + rsp.dt +
+                               ", serverAddr=" + batch.lastSendServerAddr,
                        serverErrCode: rsp.errCode,
                }
-               w.log.Error("send succeed but got error code:", rsp.errCode)
+               w.log.Error("send succeed but got error code:", rsp.errCode,
+                       ", batchID:", batch.batchID, ", serverAddr:", 
batch.lastSendServerAddr)
        }
        batch.done(err)
        delete(w.unackedBatches, batchID)

Reply via email to