This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bab820bbe [INLONG-9174][SDK] Fail fast when work is unavailable in 
Golang SDK (#9177)
2bab820bbe is described below

commit 2bab820bbe7d99d78624ade3890ca692f1ff1ca2
Author: gunli <[email protected]>
AuthorDate: Wed Nov 1 09:50:29 2023 +0800

    [INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK (#9177)
    
    Co-authored-by: gunli <[email protected]>
---
 .../dataproxy-sdk-golang/dataproxy/client.go       | 35 +++++++++-------------
 .../dataproxy-sdk-golang/dataproxy/worker.go       |  2 +-
 2 files changed, 15 insertions(+), 22 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index 65f8160d37..fab00c2949 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -22,7 +22,6 @@ import (
        "errors"
        "math"
        "sync"
-       "time"
 
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/discoverer"
@@ -215,16 +214,16 @@ func (c *client) Dial(addr string) (gnet.Conn, error) {
 }
 
 func (c *client) Send(ctx context.Context, msg Message) error {
-       worker := c.getWorker()
-       if worker == nil {
+       worker, err := c.getWorker()
+       if err != nil {
                return ErrNoAvailableWorker
        }
        return worker.send(ctx, msg)
 }
 
 func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) {
-       worker := c.getWorker()
-       if worker == nil {
+       worker, err := c.getWorker()
+       if err != nil {
                if cb != nil {
                        cb(msg, ErrNoAvailableWorker)
                }
@@ -234,23 +233,17 @@ func (c *client) SendAsync(ctx context.Context, msg 
Message, cb Callback) {
        worker.sendAsync(ctx, msg, cb)
 }
 
-func (c *client) getWorker() *worker {
-       for i := 0; i < c.options.WorkerNum; i++ {
-               index := c.curWorkerIndex.Load()
-               w := c.workers[index%uint64(len(c.workers))]
-               c.curWorkerIndex.Add(1)
-
-               if w.available() {
-                       return w
-               } else if i == c.options.WorkerNum-1 {
-                       c.metrics.incError(errAllWorkerBusy.strCode)
-                       return w
-               } else {
-                       time.Sleep(1 * time.Millisecond)
-                       continue
-               }
+func (c *client) getWorker() (*worker, error) {
+       index := c.curWorkerIndex.Load()
+       w := c.workers[index%uint64(len(c.workers))]
+       c.curWorkerIndex.Add(1)
+
+       if w.available() {
+               return w, nil
        }
-       return nil
+
+       c.metrics.incError(workerBusy.strCode)
+       return nil, workerBusy
 }
 
 func (c *client) Close() {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index bb37229d63..8a202ce937 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -62,7 +62,7 @@ var (
        errBadLog           = &errNo{code: 10010, strCode: "10010", message: 
"input log is invalid"}
        errServerError      = &errNo{code: 10011, strCode: "10011", message: 
"server error"}
        errServerPanic      = &errNo{code: 10012, strCode: "10012", message: 
"server panic"}
-       errAllWorkerBusy    = &errNo{code: 10013, strCode: "10013", message: 
"all workers are busy"}
+       workerBusy          = &errNo{code: 10013, strCode: "10013", message: 
"worker is busy"}
        errNoMatchReq4Rsp   = &errNo{code: 10014, strCode: "10014", message: 
"no match unacknowledged request for response"}
        errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: 
"conn closed by peer"}
        errUnknown          = &errNo{code: 20001, strCode: "20001", message: 
"unknown"}

Reply via email to