This is an automated email from the ASF dual-hosted git repository.
luky116 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8ce10062 refactor: refact getty module (#746)
8ce10062 is described below
commit 8ce100621fedcc7ce3bdb4222fe11049e54bbdd4
Author: xinfan.wu <[email protected]>
AuthorDate: Sat Dec 21 15:32:32 2024 +0800
refactor: refact getty module (#746)
* feat:add more linter
* feat:change golangclilint version to 1.57.x to support more linter
* feat:adjust lint conf and adjust the code to pass the check
* style: format some code; fix: some sql statement or rows was not been
closed
* feat:close session when send heart beat message failed
* refactor: change GettyRemoting from singleton to GettyRemotingClient
member
* remove rpc client
* refactor: complete GettyRemoting refactoring
* refactor: change getter name, improve variable naming and remove unused
fields
* update something
* feat:add heart-beat failed retry times
* Refactor getty.
>
>
Co-authored-by: marsevilspirit <[email protected]>
Co-authored-by: solisamicus <[email protected]>
Co-authored-by: No-SilverBullet <[email protected]>
* Refactor getty.
Co-authored-by: marsevilspirit <[email protected]>
Co-authored-by: solisamicus <[email protected]>
Co-authored-by: No-SilverBullet <[email protected]>
---------
Co-authored-by: JayLiu <[email protected]>
Co-authored-by: SolisAmicus <[email protected]>
Co-authored-by: marsevilspirit <[email protected]>
---
pkg/client/client.go | 8 +-
pkg/remoting/config/config.go | 2 +-
pkg/remoting/getty/getty_client.go | 34 ++++-
pkg/remoting/getty/getty_client_test.go | 6 +-
pkg/remoting/getty/getty_init.go | 29 +++++
pkg/remoting/getty/getty_remoting.go | 20 +--
pkg/remoting/getty/getty_remoting_test.go | 36 +++---
pkg/remoting/getty/listener.go | 39 +++---
pkg/remoting/getty/rpc_client.go | 138 ---------------------
pkg/remoting/getty/session_manager.go | 120 ++++++++++++++++--
.../client/client_on_response_processor.go | 15 +--
11 files changed, 233 insertions(+), 214 deletions(-)
diff --git a/pkg/client/client.go b/pkg/client/client.go
index 5c119187..2b3cd38a 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -62,15 +62,17 @@ func initTmClient(cfg *Config) {
})
}
-// initRemoting init rpc client
+// initRemoting init remoting
func initRemoting(cfg *Config) {
- getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{
+ seataConfig := remoteConfig.SeataConfig{
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
ServiceGrouplist: cfg.ServiceConfig.Grouplist,
LoadBalanceType: cfg.GettyConfig.LoadBalanceType,
- })
+ }
+
+ getty.InitGetty(&cfg.GettyConfig, &seataConfig)
}
// InitRmClient init client rm client
diff --git a/pkg/remoting/config/config.go b/pkg/remoting/config/config.go
index 5bb12667..823440c7 100644
--- a/pkg/remoting/config/config.go
+++ b/pkg/remoting/config/config.go
@@ -84,7 +84,7 @@ type SeataConfig struct {
LoadBalanceType string
}
-func IniConfig(seataConf *SeataConfig) {
+func InitConfig(seataConf *SeataConfig) {
seataConfig = seataConf
}
diff --git a/pkg/remoting/getty/getty_client.go
b/pkg/remoting/getty/getty_client.go
index 72874b0f..470f2a4f 100644
--- a/pkg/remoting/getty/getty_client.go
+++ b/pkg/remoting/getty/getty_client.go
@@ -35,14 +35,16 @@ var (
)
type GettyRemotingClient struct {
- idGenerator *atomic.Uint32
+ idGenerator *atomic.Uint32
+ gettyRemoting *GettyRemoting
}
func GetGettyRemotingClient() *GettyRemotingClient {
if gettyRemotingClient == nil {
onceGettyRemotingClient.Do(func() {
gettyRemotingClient = &GettyRemotingClient{
- idGenerator: &atomic.Uint32{},
+ idGenerator: &atomic.Uint32{},
+ gettyRemoting: newGettyRemoting(),
}
})
}
@@ -63,7 +65,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg
interface{}) error {
Compressor: 0,
Body: msg,
}
- return GetGettyRemotingInstance().SendASync(rpcMessage, nil,
client.asyncCallback)
+ return client.gettyRemoting.SendAsync(rpcMessage, nil,
client.asyncCallback)
}
func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg
interface{}) error {
@@ -74,7 +76,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msgID
int32, msg interface{
Compressor: 0,
Body: msg,
}
- return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
+ return client.gettyRemoting.SendAsync(rpcMessage, nil, nil)
}
func (client *GettyRemotingClient) SendSyncRequest(msg interface{})
(interface{}, error) {
@@ -85,7 +87,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg
interface{}) (interface{}
Compressor: 0,
Body: msg,
}
- return GetGettyRemotingInstance().SendSync(rpcMessage, nil,
client.syncCallback)
+ return client.gettyRemoting.SendSync(rpcMessage, nil,
client.syncCallback)
}
func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg
*message.MessageFuture) (interface{}, error) {
@@ -96,10 +98,30 @@ func (g *GettyRemotingClient) asyncCallback(reqMsg
message.RpcMessage, respMsg *
func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg
*message.MessageFuture) (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(RpcRequestTimeout):
- GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
+ g.gettyRemoting.RemoveMergedMessageFuture(reqMsg.ID)
log.Errorf("wait resp timeout: %#v", reqMsg)
return nil, fmt.Errorf("wait response timeout, request: %#v",
reqMsg)
case <-respMsg.Done:
return respMsg.Response, respMsg.Err
}
}
+
+func (client *GettyRemotingClient) GetMergedMessage(msgID int32)
*message.MergedWarpMessage {
+ return client.gettyRemoting.GetMergedMessage(msgID)
+}
+
+func (client *GettyRemotingClient) GetMessageFuture(msgID int32)
*message.MessageFuture {
+ return client.gettyRemoting.GetMessageFuture(msgID)
+}
+
+func (client *GettyRemotingClient) RemoveMessageFuture(msgID int32) {
+ client.gettyRemoting.RemoveMessageFuture(msgID)
+}
+
+func (client *GettyRemotingClient) RemoveMergedMessageFuture(msgID int32) {
+ client.gettyRemoting.RemoveMergedMessageFuture(msgID)
+}
+
+func (client *GettyRemotingClient) NotifyRpcMessageResponse(msg
message.RpcMessage) {
+ client.gettyRemoting.NotifyRpcMessageResponse(msg)
+}
diff --git a/pkg/remoting/getty/getty_client_test.go
b/pkg/remoting/getty/getty_client_test.go
index a5737283..e3e5e48d 100644
--- a/pkg/remoting/getty/getty_client_test.go
+++ b/pkg/remoting/getty/getty_client_test.go
@@ -40,7 +40,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
},
},
}
- gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()),
"SendSync",
+
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting),
"SendSync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session,
callback callbackMethod) (interface{},
error) {
return respMsg, nil
@@ -52,7 +52,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
// TestGettyRemotingClient_SendAsyncResponse unit test for SendAsyncResponse
function
func TestGettyRemotingClient_SendAsyncResponse(t *testing.T) {
- gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()),
"SendASync",
+
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting),
"SendAsync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session,
callback callbackMethod) error {
return nil
})
@@ -77,7 +77,7 @@ func TestGettyRemotingClient_SendAsyncRequest(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
-
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
+
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting),
"SendAsync",
func(_ *GettyRemoting, msg message.RpcMessage,
s getty.Session, callback callbackMethod) error {
return nil
})
diff --git a/pkg/remoting/getty/getty_init.go b/pkg/remoting/getty/getty_init.go
new file mode 100644
index 00000000..37f71601
--- /dev/null
+++ b/pkg/remoting/getty/getty_init.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+ "seata.apache.org/seata-go/pkg/protocol/codec"
+ "seata.apache.org/seata-go/pkg/remoting/config"
+)
+
+func InitGetty(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
+ config.InitConfig(seataConfig)
+ codec.Init()
+ initSessionManager(gettyConfig)
+}
diff --git a/pkg/remoting/getty/getty_remoting.go
b/pkg/remoting/getty/getty_remoting.go
index bd378f64..3dec63c1 100644
--- a/pkg/remoting/getty/getty_remoting.go
+++ b/pkg/remoting/getty/getty_remoting.go
@@ -33,11 +33,6 @@ const (
RpcRequestTimeout = 20 * time.Second
)
-var (
- gettyRemoting *GettyRemoting
- onceGettyRemoting = &sync.Once{}
-)
-
type (
callbackMethod func(reqMsg message.RpcMessage, respMsg
*message.MessageFuture) (interface{}, error)
GettyRemoting struct {
@@ -46,16 +41,11 @@ type (
}
)
-func GetGettyRemotingInstance() *GettyRemoting {
- if gettyRemoting == nil {
- onceGettyRemoting.Do(func() {
- gettyRemoting = &GettyRemoting{
- futures: &sync.Map{},
- mergeMsgMap: &sync.Map{},
- }
- })
+func newGettyRemoting() *GettyRemoting {
+ return &GettyRemoting{
+ futures: &sync.Map{},
+ mergeMsgMap: &sync.Map{},
}
- return gettyRemoting
}
func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session,
callback callbackMethod) (interface{}, error) {
@@ -72,7 +62,7 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s
getty.Session, callba
return result, err
}
-func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session,
callback callbackMethod) error {
+func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session,
callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession(msg)
}
diff --git a/pkg/remoting/getty/getty_remoting_test.go
b/pkg/remoting/getty/getty_remoting_test.go
index 8a3e9116..ef18719f 100644
--- a/pkg/remoting/getty/getty_remoting_test.go
+++ b/pkg/remoting/getty/getty_remoting_test.go
@@ -47,14 +47,15 @@ func TestGettyRemoting_GetMessageFuture(t *testing.T) {
},
},
}
+ gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.messageFuture != nil {
-
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
- messageFuture :=
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
+ messageFuture :=
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Equal(t, *test.messageFuture,
*messageFuture)
} else {
- messageFuture :=
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+ messageFuture :=
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, messageFuture)
}
})
@@ -78,13 +79,14 @@ func TestGettyRemoting_RemoveMessageFuture(t *testing.T) {
},
},
}
+ gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- GetGettyRemotingInstance().futures.Store(test.msgID,
test.messageFuture)
- messageFuture :=
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
+ messageFuture :=
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Equal(t, messageFuture, test.messageFuture)
-
GetGettyRemotingInstance().RemoveMessageFuture(test.msgID)
- messageFuture =
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+
gettyRemotingClient.gettyRemoting.RemoveMessageFuture(test.msgID)
+ messageFuture =
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, messageFuture)
})
}
@@ -110,14 +112,15 @@ func TestGettyRemoting_GetMergedMessage(t *testing.T) {
},
},
}
+ gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mergedWarpMessage != nil {
-
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
- mergedWarpMessage :=
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID,
test.mergedWarpMessage)
+ mergedWarpMessage :=
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Equal(t, *test.mergedWarpMessage,
*mergedWarpMessage)
} else {
- mergedWarpMessage :=
GetGettyRemotingInstance().GetMessageFuture(test.msgID)
+ mergedWarpMessage :=
gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, mergedWarpMessage)
}
})
@@ -144,18 +147,19 @@ func TestGettyRemoting_RemoveMergedMessageFuture(t
*testing.T) {
},
},
}
+ gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mergedWarpMessage != nil {
-
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
- mergedWarpMessage :=
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID,
test.mergedWarpMessage)
+ mergedWarpMessage :=
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.NotEmpty(t, mergedWarpMessage)
-
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
- mergedWarpMessage =
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
+ mergedWarpMessage =
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Empty(t, mergedWarpMessage)
} else {
-
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
- mergedWarpMessage :=
GetGettyRemotingInstance().GetMergedMessage(test.msgID)
+
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
+ mergedWarpMessage :=
gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Empty(t, mergedWarpMessage)
}
})
diff --git a/pkg/remoting/getty/listener.go b/pkg/remoting/getty/listener.go
index 2f2e9748..09886c3a 100644
--- a/pkg/remoting/getty/listener.go
+++ b/pkg/remoting/getty/listener.go
@@ -38,22 +38,16 @@ var (
)
type gettyClientHandler struct {
- idGenerator *atomic.Uint32
- msgFutures *sync.Map
- mergeMsgMap *sync.Map
- sessionManager *SessionManager
- processorMap map[message.MessageType]processor.RemotingProcessor
+ idGenerator *atomic.Uint32
+ processorMap map[message.MessageType]processor.RemotingProcessor
}
func GetGettyClientHandlerInstance() *gettyClientHandler {
if clientHandler == nil {
onceClientHandler.Do(func() {
clientHandler = &gettyClientHandler{
- idGenerator: &atomic.Uint32{},
- msgFutures: &sync.Map{},
- mergeMsgMap: &sync.Map{},
- sessionManager: sessionManager,
- processorMap:
make(map[message.MessageType]processor.RemotingProcessor, 0),
+ idGenerator: &atomic.Uint32{},
+ processorMap:
make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
@@ -62,7 +56,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
- g.sessionManager.registerSession(session)
+ sessionManager.registerSession(session)
conf := config.GetSeataConfig()
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest:
message.AbstractIdentifyRequest{
@@ -73,7 +67,7 @@ func (g *gettyClientHandler) OnOpen(session getty.Session)
error {
err := GetGettyRemotingClient().SendAsyncRequest(request)
if err != nil {
log.Errorf("OnOpen error: {%#v}", err.Error())
- g.sessionManager.releaseSession(session)
+ sessionManager.releaseSession(session)
return
}
}()
@@ -83,12 +77,12 @@ func (g *gettyClientHandler) OnOpen(session getty.Session)
error {
func (g *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(),
err)
- g.sessionManager.releaseSession(session)
+ sessionManager.releaseSession(session)
}
func (g *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
- g.sessionManager.releaseSession(session)
+ sessionManager.releaseSession(session)
}
func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{})
{
@@ -117,8 +111,19 @@ func (g *gettyClientHandler) OnCron(session getty.Session)
{
log.Debug("session{%s} Oncron executing", session.Stat())
err := g.transferHeartBeat(session, message.HeartBeatMessagePing)
if err != nil {
- log.Errorf("failed to send heart beat: {%#v}", err.Error())
- g.sessionManager.releaseSession(session)
+ log.Warnf("failed to send heart beat: {%#v}", err.Error())
+ if session.GetAttribute(heartBeatRetryTimesKey) != nil {
+ retryTimes :=
session.GetAttribute(heartBeatRetryTimesKey).(int)
+ if retryTimes >= maxHeartBeatRetryTimes {
+ log.Warnf("heartbeat retry times exceed default
max retry times{%d}, close the session{%s}",
+ maxHeartBeatRetryTimes, session.Stat())
+ sessionManager.releaseSession(session)
+ return
+ }
+ session.SetAttribute(heartBeatRetryTimesKey,
retryTimes+1)
+ } else {
+ session.SetAttribute(heartBeatRetryTimesKey, 1)
+ }
}
}
@@ -130,7 +135,7 @@ func (g *gettyClientHandler) transferHeartBeat(session
getty.Session, msg messag
Compressor: 0,
Body: msg,
}
- return GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
+ return GetGettyRemotingClient().gettyRemoting.SendAsync(rpcMessage,
session, nil)
}
func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType,
processor processor.RemotingProcessor) {
diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go
deleted file mode 100644
index 17b6cec3..00000000
--- a/pkg/remoting/getty/rpc_client.go
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package getty
-
-import (
- "crypto/tls"
- "fmt"
- "net"
- "sync"
-
- getty "github.com/apache/dubbo-getty"
- gxsync "github.com/dubbogo/gost/sync"
-
- "seata.apache.org/seata-go/pkg/discovery"
- "seata.apache.org/seata-go/pkg/protocol/codec"
- "seata.apache.org/seata-go/pkg/remoting/config"
- "seata.apache.org/seata-go/pkg/util/log"
-)
-
-type RpcClient struct {
- gettyConf *config.Config
- seataConf *config.SeataConfig
- gettyClients []getty.Client
- futures *sync.Map
-}
-
-func InitRpcClient(gettyConfig *config.Config, seataConfig
*config.SeataConfig) {
- config.IniConfig(seataConfig)
- rpcClient := &RpcClient{
- gettyConf: gettyConfig,
- seataConf: seataConfig,
- gettyClients: make([]getty.Client, 0),
- }
- codec.Init()
- rpcClient.init()
-}
-
-func (c *RpcClient) init() {
- addressList := c.getAvailServerList()
- if len(addressList) == 0 {
- log.Warn("no have valid seata server list")
- }
- for _, address := range addressList {
- gettyClient := getty.NewTCPClient(
- getty.WithServerAddress(fmt.Sprintf("%s:%d",
address.Addr, address.Port)),
- // todo if read c.gettyConf.ConnectionNum, will cause
the connect to fail
- getty.WithConnectionNumber(1),
-
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval),
- getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
- )
- go gettyClient.RunEventLoop(c.newSession)
- // c.gettyClients = append(c.gettyClients, gettyClient)
- }
-}
-
-func (c *RpcClient) getAvailServerList() []*discovery.ServiceInstance {
- registryService := discovery.GetRegistry()
- instances, err := registryService.Lookup(c.seataConf.TxServiceGroup)
- if err != nil {
- return nil
- }
- return instances
-}
-
-func (c *RpcClient) newSession(session getty.Session) error {
- var (
- ok bool
- tcpConn *net.TCPConn
- err error
- )
-
- if c.gettyConf.SessionConfig.CompressEncoding {
- session.SetCompressType(getty.CompressZip)
- }
- if _, ok = session.Conn().(*tls.Conn); ok {
- c.setSessionConfig(session)
- log.Debugf("server accepts new tls session:%s\n",
session.Stat())
- return nil
- }
- if _, ok = session.Conn().(*net.TCPConn); !ok {
- panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp
connection\n", session.Stat(), session.Conn()))
- }
-
- if _, ok = session.Conn().(*tls.Conn); !ok {
- if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
- return fmt.Errorf("%s, session.conn{%#v} is not tcp
connection", session.Stat(), session.Conn())
- }
-
- if err =
tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil {
- return err
- }
- if err =
tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
- return err
- }
- if c.gettyConf.SessionConfig.TCPKeepAlive {
- if err =
tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err !=
nil {
- return err
- }
- }
- if err =
tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil {
- return err
- }
- if err =
tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil {
- return err
- }
- }
-
- c.setSessionConfig(session)
- log.Debugf("rpc_client new session:%s\n", session.Stat())
-
- return nil
-}
-
-func (c *RpcClient) setSessionConfig(session getty.Session) {
- session.SetName(c.gettyConf.SessionConfig.SessionName)
- session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen)
- session.SetPkgHandler(rpcPkgHandler)
- session.SetEventListener(GetGettyClientHandlerInstance())
- session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout)
- session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout)
-
session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
- session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout)
-}
diff --git a/pkg/remoting/getty/session_manager.go
b/pkg/remoting/getty/session_manager.go
index 6a68cf1b..602fffed 100644
--- a/pkg/remoting/getty/session_manager.go
+++ b/pkg/remoting/getty/session_manager.go
@@ -18,38 +18,142 @@
package getty
import (
+ "crypto/tls"
+ "fmt"
+ "net"
"reflect"
"sync"
"sync/atomic"
"time"
getty "github.com/apache/dubbo-getty"
+ gxsync "github.com/dubbogo/gost/sync"
+ "seata.apache.org/seata-go/pkg/discovery"
"seata.apache.org/seata-go/pkg/protocol/message"
"seata.apache.org/seata-go/pkg/remoting/config"
"seata.apache.org/seata-go/pkg/remoting/loadbalance"
+ "seata.apache.org/seata-go/pkg/util/log"
)
const (
- maxCheckAliveRetry = 600
- checkAliveInternal = 100
+ maxCheckAliveRetry = 600
+ checkAliveInternal = 100
+ heartBeatRetryTimesKey = "heartbeat-retry-times"
+ maxHeartBeatRetryTimes = 3
)
-var sessionManager = newSessionManager()
+var (
+ sessionManager *SessionManager
+ onceSessionManager = &sync.Once{}
+)
type SessionManager struct {
// serverAddress -> rpc_client.Session -> bool
serverSessions sync.Map
allSessions sync.Map
sessionSize int32
+ gettyConf *config.Config
+}
+
+func initSessionManager(gettyConfig *config.Config) {
+ if sessionManager == nil {
+ onceSessionManager.Do(func() {
+ sessionManager = &SessionManager{
+ allSessions: sync.Map{},
+ serverSessions: sync.Map{},
+ gettyConf: gettyConfig,
+ }
+ sessionManager.init()
+ })
+ }
}
-func newSessionManager() *SessionManager {
- return &SessionManager{
- allSessions: sync.Map{},
- // serverAddress -> rpc_client.Session -> bool
- serverSessions: sync.Map{},
+func (g *SessionManager) init() {
+ addressList := g.getAvailServerList()
+ if len(addressList) == 0 {
+ log.Warn("no have valid seata server list")
+ }
+ for _, address := range addressList {
+ gettyClient := getty.NewTCPClient(
+ getty.WithServerAddress(fmt.Sprintf("%s:%d",
address.Addr, address.Port)),
+ // todo if read c.gettyConf.ConnectionNum, will cause
the connect to fail
+ getty.WithConnectionNumber(1),
+
getty.WithReconnectInterval(g.gettyConf.ReconnectInterval),
+ getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
+ )
+ go gettyClient.RunEventLoop(g.newSession)
+ }
+}
+
+func (g *SessionManager) getAvailServerList() []*discovery.ServiceInstance {
+ registryService := discovery.GetRegistry()
+ instances, err :=
registryService.Lookup(config.GetSeataConfig().TxServiceGroup)
+ if err != nil {
+ return nil
+ }
+ return instances
+}
+
+func (g *SessionManager) setSessionConfig(session getty.Session) {
+ session.SetName(g.gettyConf.SessionConfig.SessionName)
+ session.SetMaxMsgLen(g.gettyConf.SessionConfig.MaxMsgLen)
+ session.SetPkgHandler(rpcPkgHandler)
+ session.SetEventListener(GetGettyClientHandlerInstance())
+ session.SetReadTimeout(g.gettyConf.SessionConfig.TCPReadTimeout)
+ session.SetWriteTimeout(g.gettyConf.SessionConfig.TCPWriteTimeout)
+
session.SetCronPeriod((int)(g.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
+ session.SetWaitTime(g.gettyConf.SessionConfig.WaitTimeout)
+ session.SetAttribute(heartBeatRetryTimesKey, 0)
+}
+
+func (g *SessionManager) newSession(session getty.Session) error {
+ var (
+ ok bool
+ tcpConn *net.TCPConn
+ err error
+ )
+
+ if g.gettyConf.SessionConfig.CompressEncoding {
+ session.SetCompressType(getty.CompressZip)
}
+ if _, ok = session.Conn().(*tls.Conn); ok {
+ g.setSessionConfig(session)
+ log.Debugf("server accepts new tls session:%s\n",
session.Stat())
+ return nil
+ }
+ if _, ok = session.Conn().(*net.TCPConn); !ok {
+ panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp
connection\n", session.Stat(), session.Conn()))
+ }
+
+ if _, ok = session.Conn().(*tls.Conn); !ok {
+ if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
+ return fmt.Errorf("%s, session.conn{%#v} is not tcp
connection", session.Stat(), session.Conn())
+ }
+
+ if err =
tcpConn.SetNoDelay(g.gettyConf.SessionConfig.TCPNoDelay); err != nil {
+ return err
+ }
+ if err =
tcpConn.SetKeepAlive(g.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
+ return err
+ }
+ if g.gettyConf.SessionConfig.TCPKeepAlive {
+ if err =
tcpConn.SetKeepAlivePeriod(g.gettyConf.SessionConfig.KeepAlivePeriod); err !=
nil {
+ return err
+ }
+ }
+ if err =
tcpConn.SetReadBuffer(g.gettyConf.SessionConfig.TCPRBufSize); err != nil {
+ return err
+ }
+ if err =
tcpConn.SetWriteBuffer(g.gettyConf.SessionConfig.TCPWBufSize); err != nil {
+ return err
+ }
+ }
+
+ g.setSessionConfig(session)
+ log.Debugf("rpc_client new session:%s\n", session.Stat())
+
+ return nil
}
func (g *SessionManager) selectSession(msg interface{}) getty.Session {
diff --git a/pkg/remoting/processor/client/client_on_response_processor.go
b/pkg/remoting/processor/client/client_on_response_processor.go
index 55644b77..ada44961 100644
--- a/pkg/remoting/processor/client/client_on_response_processor.go
+++ b/pkg/remoting/processor/client/client_on_response_processor.go
@@ -46,27 +46,28 @@ type clientOnResponseProcessor struct{}
func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage
message.RpcMessage) error {
log.Infof("the rm client received clientOnResponse msg %#v from tc
server.", rpcMessage)
+ gettyRemotingClient := getty.GetGettyRemotingClient()
if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok
{
- mergedMessage :=
getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
+ mergedMessage :=
gettyRemotingClient.GetMergedMessage(rpcMessage.ID)
if mergedMessage != nil {
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
- response :=
getty.GetGettyRemotingInstance().GetMessageFuture(msgID)
+ response :=
gettyRemotingClient.GetMessageFuture(msgID)
if response != nil {
response.Response = mergedResult.Msgs[i]
response.Done <- struct{}{}
-
getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
+
gettyRemotingClient.RemoveMessageFuture(msgID)
}
}
-
getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
+
gettyRemotingClient.RemoveMergedMessageFuture(rpcMessage.ID)
}
return nil
} else {
// 如果是请求消息,做处理逻辑
- msgFuture :=
getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
+ msgFuture := gettyRemotingClient.GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
-
getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
-
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
+ gettyRemotingClient.NotifyRpcMessageResponse(rpcMessage)
+ gettyRemotingClient.RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok :=
rpcMessage.Body.(message.AbstractResultMessage); ok {
log.Infof("the rm client received response msg
[{}] from tc server.", msgFuture)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]