This is an automated email from the ASF dual-hosted git repository.

luky116 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ce10062 refactor: refact getty module (#746)
8ce10062 is described below

commit 8ce100621fedcc7ce3bdb4222fe11049e54bbdd4
Author: xinfan.wu <[email protected]>
AuthorDate: Sat Dec 21 15:32:32 2024 +0800

    refactor: refact getty module (#746)
    
    * feat:add more linter
    
    * feat:change golangclilint version to 1.57.x to support more linter
    
    * feat:adjust lint conf and adjust the code to pass the check
    
    * style: format some code; fix: some sql statement or rows was not been 
closed
    
    * feat:close session when send heart beat message failed
    
    * refactor: change GettyRemoting from singleton to GettyRemotingClient 
member
    
    * remove rpc client
    
    * refactor: complete GettyRemoting refactoring
    
    * refactor: change getter name, improve variable naming and remove unused 
fields
    
    * update something
    
    * feat:add heart-beat failed retry times
    
    * Refactor getty.
    >
    >
    Co-authored-by: marsevilspirit <[email protected]>
    Co-authored-by: solisamicus <[email protected]>
    Co-authored-by: No-SilverBullet  <[email protected]>
    
    * Refactor getty.
    
    Co-authored-by: marsevilspirit <[email protected]>
    Co-authored-by: solisamicus <[email protected]>
    Co-authored-by: No-SilverBullet  <[email protected]>
    
    ---------
    
    Co-authored-by: JayLiu <[email protected]>
    Co-authored-by: SolisAmicus <[email protected]>
    Co-authored-by: marsevilspirit <[email protected]>
---
 pkg/client/client.go                               |   8 +-
 pkg/remoting/config/config.go                      |   2 +-
 pkg/remoting/getty/getty_client.go                 |  34 ++++-
 pkg/remoting/getty/getty_client_test.go            |   6 +-
 pkg/remoting/getty/getty_init.go                   |  29 +++++
 pkg/remoting/getty/getty_remoting.go               |  20 +--
 pkg/remoting/getty/getty_remoting_test.go          |  36 +++---
 pkg/remoting/getty/listener.go                     |  39 +++---
 pkg/remoting/getty/rpc_client.go                   | 138 ---------------------
 pkg/remoting/getty/session_manager.go              | 120 ++++++++++++++++--
 .../client/client_on_response_processor.go         |  15 +--
 11 files changed, 233 insertions(+), 214 deletions(-)

diff --git a/pkg/client/client.go b/pkg/client/client.go
index 5c119187..2b3cd38a 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -62,15 +62,17 @@ func initTmClient(cfg *Config) {
        })
 }
 
-// initRemoting init rpc client
+// initRemoting init remoting
 func initRemoting(cfg *Config) {
-       getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{
+       seataConfig := remoteConfig.SeataConfig{
                ApplicationID:        cfg.ApplicationID,
                TxServiceGroup:       cfg.TxServiceGroup,
                ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
                ServiceGrouplist:     cfg.ServiceConfig.Grouplist,
                LoadBalanceType:      cfg.GettyConfig.LoadBalanceType,
-       })
+       }
+
+       getty.InitGetty(&cfg.GettyConfig, &seataConfig)
 }
 
 // InitRmClient init client rm client
diff --git a/pkg/remoting/config/config.go b/pkg/remoting/config/config.go
index 5bb12667..823440c7 100644
--- a/pkg/remoting/config/config.go
+++ b/pkg/remoting/config/config.go
@@ -84,7 +84,7 @@ type SeataConfig struct {
        LoadBalanceType      string
 }
 
-func IniConfig(seataConf *SeataConfig) {
+func InitConfig(seataConf *SeataConfig) {
        seataConfig = seataConf
 }
 
diff --git a/pkg/remoting/getty/getty_client.go 
b/pkg/remoting/getty/getty_client.go
index 72874b0f..470f2a4f 100644
--- a/pkg/remoting/getty/getty_client.go
+++ b/pkg/remoting/getty/getty_client.go
@@ -35,14 +35,16 @@ var (
 )
 
 type GettyRemotingClient struct {
-       idGenerator *atomic.Uint32
+       idGenerator   *atomic.Uint32
+       gettyRemoting *GettyRemoting
 }
 
 func GetGettyRemotingClient() *GettyRemotingClient {
        if gettyRemotingClient == nil {
                onceGettyRemotingClient.Do(func() {
                        gettyRemotingClient = &GettyRemotingClient{
-                               idGenerator: &atomic.Uint32{},
+                               idGenerator:   &atomic.Uint32{},
+                               gettyRemoting: newGettyRemoting(),
                        }
                })
        }
@@ -63,7 +65,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg 
interface{}) error {
                Compressor: 0,
                Body:       msg,
        }
-       return GetGettyRemotingInstance().SendASync(rpcMessage, nil, 
client.asyncCallback)
+       return client.gettyRemoting.SendAsync(rpcMessage, nil, 
client.asyncCallback)
 }
 
 func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg 
interface{}) error {
@@ -74,7 +76,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msgID 
int32, msg interface{
                Compressor: 0,
                Body:       msg,
        }
-       return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
+       return client.gettyRemoting.SendAsync(rpcMessage, nil, nil)
 }
 
 func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) 
(interface{}, error) {
@@ -85,7 +87,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg 
interface{}) (interface{}
                Compressor: 0,
                Body:       msg,
        }
-       return GetGettyRemotingInstance().SendSync(rpcMessage, nil, 
client.syncCallback)
+       return client.gettyRemoting.SendSync(rpcMessage, nil, 
client.syncCallback)
 }
 
 func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg 
*message.MessageFuture) (interface{}, error) {
@@ -96,10 +98,30 @@ func (g *GettyRemotingClient) asyncCallback(reqMsg 
message.RpcMessage, respMsg *
 func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg 
*message.MessageFuture) (interface{}, error) {
        select {
        case <-gxtime.GetDefaultTimerWheel().After(RpcRequestTimeout):
-               GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
+               g.gettyRemoting.RemoveMergedMessageFuture(reqMsg.ID)
                log.Errorf("wait resp timeout: %#v", reqMsg)
                return nil, fmt.Errorf("wait response timeout, request: %#v", 
reqMsg)
        case <-respMsg.Done:
                return respMsg.Response, respMsg.Err
        }
 }
+
+func (client *GettyRemotingClient) GetMergedMessage(msgID int32) 
*message.MergedWarpMessage {
+       return client.gettyRemoting.GetMergedMessage(msgID)
+}
+
+func (client *GettyRemotingClient) GetMessageFuture(msgID int32) 
*message.MessageFuture {
+       return client.gettyRemoting.GetMessageFuture(msgID)
+}
+
+func (client *GettyRemotingClient) RemoveMessageFuture(msgID int32) {
+       client.gettyRemoting.RemoveMessageFuture(msgID)
+}
+
+func (client *GettyRemotingClient) RemoveMergedMessageFuture(msgID int32) {
+       client.gettyRemoting.RemoveMergedMessageFuture(msgID)
+}
+
+func (client *GettyRemotingClient) NotifyRpcMessageResponse(msg 
message.RpcMessage) {
+       client.gettyRemoting.NotifyRpcMessageResponse(msg)
+}
diff --git a/pkg/remoting/getty/getty_client_test.go 
b/pkg/remoting/getty/getty_client_test.go
index a5737283..e3e5e48d 100644
--- a/pkg/remoting/getty/getty_client_test.go
+++ b/pkg/remoting/getty/getty_client_test.go
@@ -40,7 +40,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
                        },
                },
        }
-       gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), 
"SendSync",
+       
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), 
"SendSync",
                func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, 
callback callbackMethod) (interface{},
                        error) {
                        return respMsg, nil
@@ -52,7 +52,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
 
 // TestGettyRemotingClient_SendAsyncResponse unit test for SendAsyncResponse 
function
 func TestGettyRemotingClient_SendAsyncResponse(t *testing.T) {
-       gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), 
"SendASync",
+       
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), 
"SendAsync",
                func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, 
callback callbackMethod) error {
                        return nil
                })
@@ -77,7 +77,7 @@ func TestGettyRemotingClient_SendAsyncRequest(t *testing.T) {
        }
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
-                       
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
+                       
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), 
"SendAsync",
                                func(_ *GettyRemoting, msg message.RpcMessage, 
s getty.Session, callback callbackMethod) error {
                                        return nil
                                })
diff --git a/pkg/remoting/getty/getty_init.go b/pkg/remoting/getty/getty_init.go
new file mode 100644
index 00000000..37f71601
--- /dev/null
+++ b/pkg/remoting/getty/getty_init.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+       "seata.apache.org/seata-go/pkg/protocol/codec"
+       "seata.apache.org/seata-go/pkg/remoting/config"
+)
+
+func InitGetty(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
+       config.InitConfig(seataConfig)
+       codec.Init()
+       initSessionManager(gettyConfig)
+}
diff --git a/pkg/remoting/getty/getty_remoting.go 
b/pkg/remoting/getty/getty_remoting.go
index bd378f64..3dec63c1 100644
--- a/pkg/remoting/getty/getty_remoting.go
+++ b/pkg/remoting/getty/getty_remoting.go
@@ -33,11 +33,6 @@ const (
        RpcRequestTimeout = 20 * time.Second
 )
 
-var (
-       gettyRemoting     *GettyRemoting
-       onceGettyRemoting = &sync.Once{}
-)
-
 type (
        callbackMethod func(reqMsg message.RpcMessage, respMsg 
*message.MessageFuture) (interface{}, error)
        GettyRemoting  struct {
@@ -46,16 +41,11 @@ type (
        }
 )
 
-func GetGettyRemotingInstance() *GettyRemoting {
-       if gettyRemoting == nil {
-               onceGettyRemoting.Do(func() {
-                       gettyRemoting = &GettyRemoting{
-                               futures:     &sync.Map{},
-                               mergeMsgMap: &sync.Map{},
-                       }
-               })
+func newGettyRemoting() *GettyRemoting {
+       return &GettyRemoting{
+               futures:     &sync.Map{},
+               mergeMsgMap: &sync.Map{},
        }
-       return gettyRemoting
 }
 
 func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, 
callback callbackMethod) (interface{}, error) {
@@ -72,7 +62,7 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s 
getty.Session, callba
        return result, err
 }
 
-func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, 
callback callbackMethod) error {
+func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session, 
callback callbackMethod) error {
        if s == nil {
                s = sessionManager.selectSession(msg)
        }
diff --git a/pkg/remoting/getty/getty_remoting_test.go 
b/pkg/remoting/getty/getty_remoting_test.go
index 8a3e9116..ef18719f 100644
--- a/pkg/remoting/getty/getty_remoting_test.go
+++ b/pkg/remoting/getty/getty_remoting_test.go
@@ -47,14 +47,15 @@ func TestGettyRemoting_GetMessageFuture(t *testing.T) {
                        },
                },
        }
+       gettyRemotingClient := GetGettyRemotingClient()
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
                        if test.messageFuture != nil {
-                               
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
-                               messageFuture := 
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+                               
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
+                               messageFuture := 
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
                                assert.Equal(t, *test.messageFuture, 
*messageFuture)
                        } else {
-                               messageFuture := 
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+                               messageFuture := 
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
                                assert.Empty(t, messageFuture)
                        }
                })
@@ -78,13 +79,14 @@ func TestGettyRemoting_RemoveMessageFuture(t *testing.T) {
                        },
                },
        }
+       gettyRemotingClient := GetGettyRemotingClient()
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
-                       GetGettyRemotingInstance().futures.Store(test.msgID, 
test.messageFuture)
-                       messageFuture := 
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+                       
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
+                       messageFuture := 
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
                        assert.Equal(t, messageFuture, test.messageFuture)
-                       
GetGettyRemotingInstance().RemoveMessageFuture(test.msgID)
-                       messageFuture = 
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+                       
gettyRemotingClient.gettyRemoting.RemoveMessageFuture(test.msgID)
+                       messageFuture = 
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
                        assert.Empty(t, messageFuture)
                })
        }
@@ -110,14 +112,15 @@ func TestGettyRemoting_GetMergedMessage(t *testing.T) {
                        },
                },
        }
+       gettyRemotingClient := GetGettyRemotingClient()
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
                        if test.mergedWarpMessage != nil {
-                               
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
-                               mergedWarpMessage := 
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+                               
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, 
test.mergedWarpMessage)
+                               mergedWarpMessage := 
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
                                assert.Equal(t, *test.mergedWarpMessage, 
*mergedWarpMessage)
                        } else {
-                               mergedWarpMessage := 
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+                               mergedWarpMessage := 
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
                                assert.Empty(t, mergedWarpMessage)
                        }
                })
@@ -144,18 +147,19 @@ func TestGettyRemoting_RemoveMergedMessageFuture(t 
*testing.T) {
                        },
                },
        }
+       gettyRemotingClient := GetGettyRemotingClient()
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
                        if test.mergedWarpMessage != nil {
-                               
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
-                               mergedWarpMessage := 
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+                               
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, 
test.mergedWarpMessage)
+                               mergedWarpMessage := 
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
                                assert.NotEmpty(t, mergedWarpMessage)
-                               
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
-                               mergedWarpMessage = 
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+                               
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
+                               mergedWarpMessage = 
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
                                assert.Empty(t, mergedWarpMessage)
                        } else {
-                               
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
-                               mergedWarpMessage := 
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+                               
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
+                               mergedWarpMessage := 
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
                                assert.Empty(t, mergedWarpMessage)
                        }
                })
diff --git a/pkg/remoting/getty/listener.go b/pkg/remoting/getty/listener.go
index 2f2e9748..09886c3a 100644
--- a/pkg/remoting/getty/listener.go
+++ b/pkg/remoting/getty/listener.go
@@ -38,22 +38,16 @@ var (
 )
 
 type gettyClientHandler struct {
-       idGenerator    *atomic.Uint32
-       msgFutures     *sync.Map
-       mergeMsgMap    *sync.Map
-       sessionManager *SessionManager
-       processorMap   map[message.MessageType]processor.RemotingProcessor
+       idGenerator  *atomic.Uint32
+       processorMap map[message.MessageType]processor.RemotingProcessor
 }
 
 func GetGettyClientHandlerInstance() *gettyClientHandler {
        if clientHandler == nil {
                onceClientHandler.Do(func() {
                        clientHandler = &gettyClientHandler{
-                               idGenerator:    &atomic.Uint32{},
-                               msgFutures:     &sync.Map{},
-                               mergeMsgMap:    &sync.Map{},
-                               sessionManager: sessionManager,
-                               processorMap:   
make(map[message.MessageType]processor.RemotingProcessor, 0),
+                               idGenerator:  &atomic.Uint32{},
+                               processorMap: 
make(map[message.MessageType]processor.RemotingProcessor, 0),
                        }
                })
        }
@@ -62,7 +56,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
 
 func (g *gettyClientHandler) OnOpen(session getty.Session) error {
        log.Infof("Open new getty session ")
-       g.sessionManager.registerSession(session)
+       sessionManager.registerSession(session)
        conf := config.GetSeataConfig()
        go func() {
                request := message.RegisterTMRequest{AbstractIdentifyRequest: 
message.AbstractIdentifyRequest{
@@ -73,7 +67,7 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) 
error {
                err := GetGettyRemotingClient().SendAsyncRequest(request)
                if err != nil {
                        log.Errorf("OnOpen error: {%#v}", err.Error())
-                       g.sessionManager.releaseSession(session)
+                       sessionManager.releaseSession(session)
                        return
                }
        }()
@@ -83,12 +77,12 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) 
error {
 
 func (g *gettyClientHandler) OnError(session getty.Session, err error) {
        log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), 
err)
-       g.sessionManager.releaseSession(session)
+       sessionManager.releaseSession(session)
 }
 
 func (g *gettyClientHandler) OnClose(session getty.Session) {
        log.Infof("session{%s} is closing......", session.Stat())
-       g.sessionManager.releaseSession(session)
+       sessionManager.releaseSession(session)
 }
 
 func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) 
{
@@ -117,8 +111,19 @@ func (g *gettyClientHandler) OnCron(session getty.Session) 
{
        log.Debug("session{%s} Oncron executing", session.Stat())
        err := g.transferHeartBeat(session, message.HeartBeatMessagePing)
        if err != nil {
-               log.Errorf("failed to send heart beat: {%#v}", err.Error())
-               g.sessionManager.releaseSession(session)
+               log.Warnf("failed to send heart beat: {%#v}", err.Error())
+               if session.GetAttribute(heartBeatRetryTimesKey) != nil {
+                       retryTimes := 
session.GetAttribute(heartBeatRetryTimesKey).(int)
+                       if retryTimes >= maxHeartBeatRetryTimes {
+                               log.Warnf("heartbeat retry times exceed default 
max retry times{%d}, close the session{%s}",
+                                       maxHeartBeatRetryTimes, session.Stat())
+                               sessionManager.releaseSession(session)
+                               return
+                       }
+                       session.SetAttribute(heartBeatRetryTimesKey, 
retryTimes+1)
+               } else {
+                       session.SetAttribute(heartBeatRetryTimesKey, 1)
+               }
        }
 }
 
@@ -130,7 +135,7 @@ func (g *gettyClientHandler) transferHeartBeat(session 
getty.Session, msg messag
                Compressor: 0,
                Body:       msg,
        }
-       return GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
+       return GetGettyRemotingClient().gettyRemoting.SendAsync(rpcMessage, 
session, nil)
 }
 
 func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, 
processor processor.RemotingProcessor) {
diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go
deleted file mode 100644
index 17b6cec3..00000000
--- a/pkg/remoting/getty/rpc_client.go
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package getty
-
-import (
-       "crypto/tls"
-       "fmt"
-       "net"
-       "sync"
-
-       getty "github.com/apache/dubbo-getty"
-       gxsync "github.com/dubbogo/gost/sync"
-
-       "seata.apache.org/seata-go/pkg/discovery"
-       "seata.apache.org/seata-go/pkg/protocol/codec"
-       "seata.apache.org/seata-go/pkg/remoting/config"
-       "seata.apache.org/seata-go/pkg/util/log"
-)
-
-type RpcClient struct {
-       gettyConf    *config.Config
-       seataConf    *config.SeataConfig
-       gettyClients []getty.Client
-       futures      *sync.Map
-}
-
-func InitRpcClient(gettyConfig *config.Config, seataConfig 
*config.SeataConfig) {
-       config.IniConfig(seataConfig)
-       rpcClient := &RpcClient{
-               gettyConf:    gettyConfig,
-               seataConf:    seataConfig,
-               gettyClients: make([]getty.Client, 0),
-       }
-       codec.Init()
-       rpcClient.init()
-}
-
-func (c *RpcClient) init() {
-       addressList := c.getAvailServerList()
-       if len(addressList) == 0 {
-               log.Warn("no have valid seata server list")
-       }
-       for _, address := range addressList {
-               gettyClient := getty.NewTCPClient(
-                       getty.WithServerAddress(fmt.Sprintf("%s:%d", 
address.Addr, address.Port)),
-                       // todo if read c.gettyConf.ConnectionNum, will cause 
the connect to fail
-                       getty.WithConnectionNumber(1),
-                       
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval),
-                       getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
-               )
-               go gettyClient.RunEventLoop(c.newSession)
-               // c.gettyClients = append(c.gettyClients, gettyClient)
-       }
-}
-
-func (c *RpcClient) getAvailServerList() []*discovery.ServiceInstance {
-       registryService := discovery.GetRegistry()
-       instances, err := registryService.Lookup(c.seataConf.TxServiceGroup)
-       if err != nil {
-               return nil
-       }
-       return instances
-}
-
-func (c *RpcClient) newSession(session getty.Session) error {
-       var (
-               ok      bool
-               tcpConn *net.TCPConn
-               err     error
-       )
-
-       if c.gettyConf.SessionConfig.CompressEncoding {
-               session.SetCompressType(getty.CompressZip)
-       }
-       if _, ok = session.Conn().(*tls.Conn); ok {
-               c.setSessionConfig(session)
-               log.Debugf("server accepts new tls session:%s\n", 
session.Stat())
-               return nil
-       }
-       if _, ok = session.Conn().(*net.TCPConn); !ok {
-               panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp 
connection\n", session.Stat(), session.Conn()))
-       }
-
-       if _, ok = session.Conn().(*tls.Conn); !ok {
-               if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
-                       return fmt.Errorf("%s, session.conn{%#v} is not tcp 
connection", session.Stat(), session.Conn())
-               }
-
-               if err = 
tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil {
-                       return err
-               }
-               if err = 
tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
-                       return err
-               }
-               if c.gettyConf.SessionConfig.TCPKeepAlive {
-                       if err = 
tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err != 
nil {
-                               return err
-                       }
-               }
-               if err = 
tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil {
-                       return err
-               }
-               if err = 
tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil {
-                       return err
-               }
-       }
-
-       c.setSessionConfig(session)
-       log.Debugf("rpc_client new session:%s\n", session.Stat())
-
-       return nil
-}
-
-func (c *RpcClient) setSessionConfig(session getty.Session) {
-       session.SetName(c.gettyConf.SessionConfig.SessionName)
-       session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen)
-       session.SetPkgHandler(rpcPkgHandler)
-       session.SetEventListener(GetGettyClientHandlerInstance())
-       session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout)
-       session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout)
-       
session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
-       session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout)
-}
diff --git a/pkg/remoting/getty/session_manager.go 
b/pkg/remoting/getty/session_manager.go
index 6a68cf1b..602fffed 100644
--- a/pkg/remoting/getty/session_manager.go
+++ b/pkg/remoting/getty/session_manager.go
@@ -18,38 +18,142 @@
 package getty
 
 import (
+       "crypto/tls"
+       "fmt"
+       "net"
        "reflect"
        "sync"
        "sync/atomic"
        "time"
 
        getty "github.com/apache/dubbo-getty"
+       gxsync "github.com/dubbogo/gost/sync"
 
+       "seata.apache.org/seata-go/pkg/discovery"
        "seata.apache.org/seata-go/pkg/protocol/message"
        "seata.apache.org/seata-go/pkg/remoting/config"
        "seata.apache.org/seata-go/pkg/remoting/loadbalance"
+       "seata.apache.org/seata-go/pkg/util/log"
 )
 
 const (
-       maxCheckAliveRetry = 600
-       checkAliveInternal = 100
+       maxCheckAliveRetry     = 600
+       checkAliveInternal     = 100
+       heartBeatRetryTimesKey = "heartbeat-retry-times"
+       maxHeartBeatRetryTimes = 3
 )
 
-var sessionManager = newSessionManager()
+var (
+       sessionManager     *SessionManager
+       onceSessionManager = &sync.Once{}
+)
 
 type SessionManager struct {
        // serverAddress -> rpc_client.Session -> bool
        serverSessions sync.Map
        allSessions    sync.Map
        sessionSize    int32
+       gettyConf      *config.Config
+}
+
+func initSessionManager(gettyConfig *config.Config) {
+       if sessionManager == nil {
+               onceSessionManager.Do(func() {
+                       sessionManager = &SessionManager{
+                               allSessions:    sync.Map{},
+                               serverSessions: sync.Map{},
+                               gettyConf:      gettyConfig,
+                       }
+                       sessionManager.init()
+               })
+       }
 }
 
-func newSessionManager() *SessionManager {
-       return &SessionManager{
-               allSessions: sync.Map{},
-               // serverAddress -> rpc_client.Session -> bool
-               serverSessions: sync.Map{},
+func (g *SessionManager) init() {
+       addressList := g.getAvailServerList()
+       if len(addressList) == 0 {
+               log.Warn("no have valid seata server list")
+       }
+       for _, address := range addressList {
+               gettyClient := getty.NewTCPClient(
+                       getty.WithServerAddress(fmt.Sprintf("%s:%d", 
address.Addr, address.Port)),
+                       // todo if read c.gettyConf.ConnectionNum, will cause 
the connect to fail
+                       getty.WithConnectionNumber(1),
+                       
getty.WithReconnectInterval(g.gettyConf.ReconnectInterval),
+                       getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
+               )
+               go gettyClient.RunEventLoop(g.newSession)
+       }
+}
+
+func (g *SessionManager) getAvailServerList() []*discovery.ServiceInstance {
+       registryService := discovery.GetRegistry()
+       instances, err := 
registryService.Lookup(config.GetSeataConfig().TxServiceGroup)
+       if err != nil {
+               return nil
+       }
+       return instances
+}
+
+func (g *SessionManager) setSessionConfig(session getty.Session) {
+       session.SetName(g.gettyConf.SessionConfig.SessionName)
+       session.SetMaxMsgLen(g.gettyConf.SessionConfig.MaxMsgLen)
+       session.SetPkgHandler(rpcPkgHandler)
+       session.SetEventListener(GetGettyClientHandlerInstance())
+       session.SetReadTimeout(g.gettyConf.SessionConfig.TCPReadTimeout)
+       session.SetWriteTimeout(g.gettyConf.SessionConfig.TCPWriteTimeout)
+       
session.SetCronPeriod((int)(g.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
+       session.SetWaitTime(g.gettyConf.SessionConfig.WaitTimeout)
+       session.SetAttribute(heartBeatRetryTimesKey, 0)
+}
+
+func (g *SessionManager) newSession(session getty.Session) error {
+       var (
+               ok      bool
+               tcpConn *net.TCPConn
+               err     error
+       )
+
+       if g.gettyConf.SessionConfig.CompressEncoding {
+               session.SetCompressType(getty.CompressZip)
        }
+       if _, ok = session.Conn().(*tls.Conn); ok {
+               g.setSessionConfig(session)
+               log.Debugf("server accepts new tls session:%s\n", 
session.Stat())
+               return nil
+       }
+       if _, ok = session.Conn().(*net.TCPConn); !ok {
+               panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp 
connection\n", session.Stat(), session.Conn()))
+       }
+
+       if _, ok = session.Conn().(*tls.Conn); !ok {
+               if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
+                       return fmt.Errorf("%s, session.conn{%#v} is not tcp 
connection", session.Stat(), session.Conn())
+               }
+
+               if err = 
tcpConn.SetNoDelay(g.gettyConf.SessionConfig.TCPNoDelay); err != nil {
+                       return err
+               }
+               if err = 
tcpConn.SetKeepAlive(g.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
+                       return err
+               }
+               if g.gettyConf.SessionConfig.TCPKeepAlive {
+                       if err = 
tcpConn.SetKeepAlivePeriod(g.gettyConf.SessionConfig.KeepAlivePeriod); err != 
nil {
+                               return err
+                       }
+               }
+               if err = 
tcpConn.SetReadBuffer(g.gettyConf.SessionConfig.TCPRBufSize); err != nil {
+                       return err
+               }
+               if err = 
tcpConn.SetWriteBuffer(g.gettyConf.SessionConfig.TCPWBufSize); err != nil {
+                       return err
+               }
+       }
+
+       g.setSessionConfig(session)
+       log.Debugf("rpc_client new session:%s\n", session.Stat())
+
+       return nil
 }
 
 func (g *SessionManager) selectSession(msg interface{}) getty.Session {
diff --git a/pkg/remoting/processor/client/client_on_response_processor.go 
b/pkg/remoting/processor/client/client_on_response_processor.go
index 55644b77..ada44961 100644
--- a/pkg/remoting/processor/client/client_on_response_processor.go
+++ b/pkg/remoting/processor/client/client_on_response_processor.go
@@ -46,27 +46,28 @@ type clientOnResponseProcessor struct{}
 
 func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage 
message.RpcMessage) error {
        log.Infof("the rm client received  clientOnResponse msg %#v from tc 
server.", rpcMessage)
+       gettyRemotingClient := getty.GetGettyRemotingClient()
        if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok 
{
-               mergedMessage := 
getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
+               mergedMessage := 
gettyRemotingClient.GetMergedMessage(rpcMessage.ID)
                if mergedMessage != nil {
                        for i := 0; i < len(mergedMessage.Msgs); i++ {
                                msgID := mergedMessage.MsgIds[i]
-                               response := 
getty.GetGettyRemotingInstance().GetMessageFuture(msgID)
+                               response := 
gettyRemotingClient.GetMessageFuture(msgID)
                                if response != nil {
                                        response.Response = mergedResult.Msgs[i]
                                        response.Done <- struct{}{}
-                                       
getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
+                                       
gettyRemotingClient.RemoveMessageFuture(msgID)
                                }
                        }
-                       
getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
+                       
gettyRemotingClient.RemoveMergedMessageFuture(rpcMessage.ID)
                }
                return nil
        } else {
                // 如果是请求消息,做处理逻辑
-               msgFuture := 
getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
+               msgFuture := gettyRemotingClient.GetMessageFuture(rpcMessage.ID)
                if msgFuture != nil {
-                       
getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
-                       
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
+                       gettyRemotingClient.NotifyRpcMessageResponse(rpcMessage)
+                       gettyRemotingClient.RemoveMessageFuture(rpcMessage.ID)
                } else {
                        if _, ok := 
rpcMessage.Body.(message.AbstractResultMessage); ok {
                                log.Infof("the rm client received response msg 
[{}] from tc server.", msgFuture)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to