This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4307016a91 [INLONG-12121][SDK] Log remote server address on send
timeout and server errors in DataProxy Go SDK (#12124)
4307016a91 is described below
commit 4307016a91304b4535c12c39c03b4915120a9635
Author: yfsn666 <[email protected]>
AuthorDate: Mon May 18 19:26:59 2026 +0800
[INLONG-12121][SDK] Log remote server address on send timeout and server
errors in DataProxy Go SDK (#12124)
---
.../dataproxy-sdk-golang/connpool/connpool.go | 5 ++--
.../dataproxy-sdk-golang/dataproxy/request.go | 33 +++++++++++-----------
.../dataproxy-sdk-golang/dataproxy/worker.go | 26 +++++++++++------
3 files changed, 37 insertions(+), 27 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index 86898ed8c0..77b3eb75f7 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -575,7 +575,8 @@ func (p *connPool) innerWork() {
}
}
-func getRemoteAddr(conn gnet.Conn) string {
+// GetRemoteAddr returns the remote address of the connection safely
+func GetRemoteAddr(conn gnet.Conn) string {
if conn == nil {
return ""
}
@@ -628,7 +629,7 @@ loop:
// close the expired conn and append new conn with the same addr
for _, expired := range expiredConns {
- addr := getRemoteAddr(expired)
+ addr := GetRemoteAddr(expired)
p.log.Debug("connection expired, close it, addr:", addr, ",
err:", nil)
CloseConn(expired, defaultConnCloseDelay)
_ = p.appendNewConn(addr)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index 82100ef25d..1eabd39e8a 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -86,22 +86,23 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte {
type batchCallback func()
type batchReq struct {
- pool *sync.Pool
- workerID string
- batchID string
- groupID string
- streamID string
- dataReqs []*sendDataReq
- dataSize int
- batchTime time.Time
- lastSendTime time.Time
- retries int
- bufferPool bufferpool.BufferPool
- bytePool bufferpool.BytePool
- buffer *bytes.Buffer
- callback batchCallback
- metrics *metrics
- addColumns string
+ pool *sync.Pool
+ workerID string
+ batchID string
+ groupID string
+ streamID string
+ dataReqs []*sendDataReq
+ dataSize int
+ batchTime time.Time
+ lastSendTime time.Time
+ lastSendServerAddr string
+ retries int
+ bufferPool bufferpool.BufferPool
+ bytePool bufferpool.BytePool
+ buffer *bytes.Buffer
+ callback batchCallback
+ metrics *metrics
+ addColumns string
}
func (b *batchReq) append(req *sendDataReq) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 46c6cfde3c..02ad9cda73 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -30,6 +30,7 @@ import (
"go.uber.org/atomic"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
+
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"
@@ -416,6 +417,10 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
b.lastSendTime = time.Now()
b.encode()
+ // record the remote server address at the moment the batch is
dispatched
+ conn := w.getConn()
+ b.lastSendServerAddr = connpool.GetRemoteAddr(conn)
+
// error callback
onErr := func(c gnet.Conn, e error, inCallback bool) {
defer func() {
@@ -427,7 +432,8 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
}()
w.metrics.incError(errConnWriteFailed.getStrCode())
- w.log.Error("send batch failed, err: ", e, ", inCallback: ",
inCallback, ", logNum:", len(b.dataReqs))
+ w.log.Error("send batch failed, err: ", e, ", inCallback: ",
inCallback, ", logNum:", len(b.dataReqs),
+ ", workerID:", w.index, ", batchID:", b.batchID, ",
serverAddr:", b.lastSendServerAddr)
// close already
if w.getState() == stateClosed {
@@ -459,9 +465,8 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
}
// very important:'cause we use gnet, we must call AsyncWrite to send
data in goroutines that are different from gnet.OnTraffic() callback
- conn := w.getConn()
if b.retries > 0 {
- w.log.Debug("retry batch to conn:", conn.RemoteAddr(), ",
workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
+ w.log.Debug("retry batch to conn:", b.lastSendServerAddr, ",
workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
}
err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error)
error {
if e != nil {
@@ -564,7 +569,8 @@ func (w *worker) handleSendTimeout() {
for batchID, batch := range w.unackedBatches {
if time.Since(batch.lastSendTime) > w.options.SendTimeout {
w.log.Warn("worker[", w.index, "] send timeout, resend
it now:", batch.batchID, "batchID:", batchID,
- ",last send time:",
batch.lastSendTime.UnixMilli(), ", now:", time.Now().UnixMilli(), "timeout
option:", w.options.SendTimeout)
+ ", last send time:",
batch.lastSendTime.UnixMilli(), ", now:", time.Now().UnixMilli(),
+ ", timeout option:", w.options.SendTimeout, ",
serverAddr:", batch.lastSendServerAddr)
//
// w.retryBatches <- batch
w.backoffRetry(context.Background(), batch)
@@ -651,7 +657,7 @@ func (w *worker) handleRsp(rsp *batchRsp) {
needSwitchConn, isRetryable :=
retryableServerErrorCodes[rsp.errCode]
if needSwitchConn && w.client != nil {
w.log.Warn("server error detected, switching
connection, errCode:", rsp.errCode,
- ", batchID:", batch.batchID)
+ ", batchID:", batch.batchID, ", serverAddr:",
batch.lastSendServerAddr)
w.updateConn(nil, nil)
}
@@ -659,8 +665,8 @@ func (w *worker) handleRsp(rsp *batchRsp) {
if w.options.RetryOnServerError && isRetryable && batch.retries
< w.options.MaxRetries {
delete(w.unackedBatches, batchID)
- w.log.Warn("server error, will retry, errCode:",
rsp.errCode,
- ", batchID:", batch.batchID, ", retries:",
batch.retries)
+ w.log.Warn("server error, will retry, errCode:",
rsp.errCode, ", batchID:", batch.batchID,
+ ", retries:", batch.retries, ", serverAddr:",
batch.lastSendServerAddr)
w.backoffRetry(context.Background(), batch)
return
@@ -674,10 +680,12 @@ func (w *worker) handleRsp(rsp *batchRsp) {
", batchID=" + rsp.batchID +
", groupID=" + rsp.groupID +
", streamID=" + rsp.streamID +
- ", dt=" + rsp.dt,
+ ", dt=" + rsp.dt +
+ ", serverAddr=" + batch.lastSendServerAddr,
serverErrCode: rsp.errCode,
}
- w.log.Error("send succeed but got error code:", rsp.errCode)
+ w.log.Error("send succeed but got error code:", rsp.errCode,
+ ", batchID:", batch.batchID, ", serverAddr:",
batch.lastSendServerAddr)
}
batch.done(err)
delete(w.unackedBatches, batchID)