This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0d07152244 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK
(#9181)
0d07152244 is described below
commit 0d07152244153db1299d75a1a64ebb0a8b891d05
Author: gunli <[email protected]>
AuthorDate: Wed Nov 1 09:47:14 2023 +0800
[INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/dataproxy/request.go | 23 +++++++++++++++++-----
.../dataproxy-sdk-golang/dataproxy/worker.go | 6 +++++-
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index d93b0af006..187e074415 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -53,7 +53,9 @@ func init() {
}
batchPool = &sync.Pool{
New: func() interface{} {
- return &batchReq{}
+ return &batchReq{
+ dataReqs: make([]*sendDataReq, 0, 50),
+ }
},
}
}
@@ -109,12 +111,17 @@ func (b *batchReq) append(req *sendDataReq) {
func (b *batchReq) done(err error) {
errorCode := getErrorCode(err)
- for _, req := range b.dataReqs {
+ for i, req := range b.dataReqs {
req.done(err, errorCode)
+ b.dataReqs[i] = nil
+ }
+ if b.dataReqs != nil {
+ b.dataReqs = b.dataReqs[:0]
}
if b.callback != nil {
b.callback()
+ b.callback = nil
}
if b.buffer != nil && b.bufferPool != nil {
@@ -128,10 +135,12 @@ func (b *batchReq) done(err error) {
}
b.metrics.observeTime(errorCode,
time.Since(b.batchTime).Milliseconds())
b.metrics.observeSize(errorCode, b.dataSize)
+ b.metrics = nil
}
if b.pool != nil {
b.pool.Put(b)
+ b.pool = nil
}
}
@@ -334,25 +343,29 @@ type sendDataReq struct {
func (s *sendDataReq) done(err error, errCode string) {
if s.semaphore != nil {
s.semaphore.Release()
+ if s.metrics != nil {
+ s.metrics.decPending(s.workerID)
+ }
+ s.semaphore = nil
}
if s.callback != nil {
s.callback(s.msg, err)
+ s.callback = nil
}
if s.metrics != nil {
- if s.semaphore != nil {
- s.metrics.decPending(s.workerID)
- }
if errCode == "" {
errCode = getErrorCode(err)
}
s.metrics.incMessage(errCode)
+ s.metrics = nil
}
if s.pool != nil {
s.pool.Put(s)
+ s.pool = nil
}
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index be3772c38d..d5ab3018cf 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -322,13 +322,17 @@ func (w *worker) handleSendData(req *sendDataReq) {
if !ok {
streamID := req.msg.StreamID
batch = batchPool.Get().(*batchReq)
+ dataReqs := batch.dataReqs
+ if dataReqs == nil {
+ dataReqs = make([]*sendDataReq, 0,
w.options.BatchingMaxMessages)
+ }
*batch = batchReq{
pool: batchPool,
workerID: w.indexStr,
batchID: util.SnowFlakeID(),
groupID: w.options.GroupID,
streamID: streamID,
- dataReqs: make([]*sendDataReq, 0,
w.options.BatchingMaxMessages),
+ dataReqs: dataReqs,
batchTime: time.Now(),
retries: 0,
bufferPool: w.bufferPool,