This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2bab820bbe [INLONG-9174][SDK] Fail fast when work is unavailable in
Golang SDK (#9177)
2bab820bbe is described below
commit 2bab820bbe7d99d78624ade3890ca692f1ff1ca2
Author: gunli <[email protected]>
AuthorDate: Wed Nov 1 09:50:29 2023 +0800
[INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK (#9177)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/dataproxy/client.go | 35 +++++++++-------------
.../dataproxy-sdk-golang/dataproxy/worker.go | 2 +-
2 files changed, 15 insertions(+), 22 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index 65f8160d37..fab00c2949 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -22,7 +22,6 @@ import (
"errors"
"math"
"sync"
- "time"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/discoverer"
@@ -215,16 +214,16 @@ func (c *client) Dial(addr string) (gnet.Conn, error) {
}
func (c *client) Send(ctx context.Context, msg Message) error {
- worker := c.getWorker()
- if worker == nil {
+ worker, err := c.getWorker()
+ if err != nil {
return ErrNoAvailableWorker
}
return worker.send(ctx, msg)
}
func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) {
- worker := c.getWorker()
- if worker == nil {
+ worker, err := c.getWorker()
+ if err != nil {
if cb != nil {
cb(msg, ErrNoAvailableWorker)
}
@@ -234,23 +233,17 @@ func (c *client) SendAsync(ctx context.Context, msg
Message, cb Callback) {
worker.sendAsync(ctx, msg, cb)
}
-func (c *client) getWorker() *worker {
- for i := 0; i < c.options.WorkerNum; i++ {
- index := c.curWorkerIndex.Load()
- w := c.workers[index%uint64(len(c.workers))]
- c.curWorkerIndex.Add(1)
-
- if w.available() {
- return w
- } else if i == c.options.WorkerNum-1 {
- c.metrics.incError(errAllWorkerBusy.strCode)
- return w
- } else {
- time.Sleep(1 * time.Millisecond)
- continue
- }
+func (c *client) getWorker() (*worker, error) {
+ index := c.curWorkerIndex.Load()
+ w := c.workers[index%uint64(len(c.workers))]
+ c.curWorkerIndex.Add(1)
+
+ if w.available() {
+ return w, nil
}
- return nil
+
+ c.metrics.incError(workerBusy.strCode)
+ return nil, workerBusy
}
func (c *client) Close() {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index bb37229d63..8a202ce937 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -62,7 +62,7 @@ var (
errBadLog = &errNo{code: 10010, strCode: "10010", message:
"input log is invalid"}
errServerError = &errNo{code: 10011, strCode: "10011", message:
"server error"}
errServerPanic = &errNo{code: 10012, strCode: "10012", message:
"server panic"}
- errAllWorkerBusy = &errNo{code: 10013, strCode: "10013", message:
"all workers are busy"}
+ workerBusy = &errNo{code: 10013, strCode: "10013", message:
"worker is busy"}
errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message:
"no match unacknowledged request for response"}
errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message:
"conn closed by peer"}
errUnknown = &errNo{code: 20001, strCode: "20001", message:
"unknown"}