This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new b4937faa6 fix(go-client): loopForRequest not return and retry forever 
(#1444)
b4937faa6 is described below

commit b4937faa6f07248cba6dd887b4e036b23c8357e2
Author: Yandi Lee <[email protected]>
AuthorDate: Tue Apr 18 15:16:17 2023 +0800

    fix(go-client): loopForRequest not return and retry forever (#1444)
---
 go-client/pegasus/table_connector.go      |  1 +
 go-client/pegasus/table_connector_test.go |  2 +-
 go-client/session/session.go              | 12 +++++++++++-
 3 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/go-client/pegasus/table_connector.go 
b/go-client/pegasus/table_connector.go
index 346bfebec..d1074a7bf 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -726,6 +726,7 @@ func (p *pegasusTableConnector) handleReplicaError(err 
error, replica *session.R
                        err = errors.New(err.Error() + " Rate of requests 
exceeds the throughput limit")
                case base.ERR_INVALID_STATE:
                        err = errors.New(err.Error() + " The target replica is 
not primary")
+                       retry = false
                case base.ERR_OBJECT_NOT_FOUND:
                        err = errors.New(err.Error() + " The replica server 
doesn't serve this partition")
                case base.ERR_SPLITTING:
diff --git a/go-client/pegasus/table_connector_test.go 
b/go-client/pegasus/table_connector_test.go
index 4aabe5eb4..1b2874765 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -261,7 +261,7 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t 
*testing.T) {
        <-ptb.confUpdateCh
        assert.Error(t, err)
        assert.True(t, confUpdate)
-       assert.True(t, retry)
+       assert.False(t, retry)
 
        confUpdate, retry, err = 
ptb.handleReplicaError(base.ERR_PARENT_PARTITION_MISUSED, nil)
        <-ptb.confUpdateCh
diff --git a/go-client/session/session.go b/go-client/session/session.go
index 034d91c92..20aa585ae 100644
--- a/go-client/session/session.go
+++ b/go-client/session/session.go
@@ -226,8 +226,16 @@ func (n *nodeSession) notifyCallerAndDrop(req 
*requestListener) {
 // single-routine worker used for sending requests.
 // Any error occurred will end up this goroutine as well as the connection.
 func (n *nodeSession) loopForRequest() error { // no error returned actually
+       //add ticker to trigger loop return
+       // since if correlative loopForResponse returned because of 
IsNetworkClosed(EOF),
+       // this loop will not receive any signal and runs forever
+       ticker := time.NewTicker(1 * time.Millisecond)
        for {
                select {
+               case <-ticker.C:
+                       if n.conn.GetState() != rpc.ConnStateReady {
+                               return nil
+                       }
                case <-n.tom.Dying():
                        return nil
                case req := <-n.reqc:
@@ -335,7 +343,9 @@ func (n *nodeSession) waitUntilSessionReady(ctx 
context.Context) error {
                }
 
                if !ready {
-                       return fmt.Errorf("session %s is unable to connect 
(used %dms), the context error: %s", n, time.Since(dialStart)/time.Millisecond, 
ctx.Err())
+                       //return error directly so that it can be recognized in 
`handleReplicaError`
+                       n.logger.Printf("session %s is unable to connect (used 
%dms), the context error: %s", n, time.Since(dialStart)/time.Millisecond, 
ctx.Err())
+                       return ctx.Err()
                }
        }
        return nil


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to