This is an automated email from the ASF dual-hosted git repository.

goodboycoder pushed a commit to branch feature/support-raft
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/support-raft by this 
push:
     new dae0737a OSPP 2025 Go&Java Version alignment  / support-raft (#923)
dae0737a is described below

commit dae0737a2b93ba18615eaa46f406a61f208b9e37
Author: flypiggy <[email protected]>
AuthorDate: Thu Oct 30 22:59:23 2025 +0800

    OSPP 2025 Go&Java Version alignment  / support-raft (#923)
    
    upd-raft
---
 pkg/discovery/base.go                           |   1 +
 pkg/discovery/config.go                         |  27 +-
 pkg/discovery/init.go                           |   2 +
 pkg/discovery/{base.go => metadata/constant.go} |  31 +-
 pkg/discovery/metadata/metadata.go              | 161 ++++++++
 pkg/discovery/{base.go => metadata/node.go}     |  34 +-
 pkg/discovery/raft.go                           | 522 ++++++++++++++++++++++++
 pkg/discovery/raft_test.go                      | 450 ++++++++++++++++++++
 testdata/conf/seatago.yml                       |   9 +-
 9 files changed, 1198 insertions(+), 39 deletions(-)

diff --git a/pkg/discovery/base.go b/pkg/discovery/base.go
index fcc0d016..f3b5a7a5 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/base.go
@@ -26,6 +26,7 @@ const (
        ZK     string = "zk"
        CONSUL string = "consul"
        SOFA   string = "sofa"
+       RAFT   string = "raft"
 )
 
 type ServiceInstance struct {
diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go
index d8a78874..e80c046a 100644
--- a/pkg/discovery/config.go
+++ b/pkg/discovery/config.go
@@ -38,17 +38,24 @@ func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix 
string, f *flag.FlagSet
 }
 
 type RegistryConfig struct {
-       Type  string      `yaml:"type" json:"type" koanf:"type"`
-       File  FileConfig  `yaml:"file" json:"file" koanf:"file"`
-       Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
-       Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
+       Type             string      `yaml:"type" json:"type" koanf:"type"`
+       NamingserverAddr string      `yaml:"namingserver-addr" 
json:"namingserver-addr" koanf:"namingserver-addr"`
+       Username         string      `yaml:"username" json:"username" 
koanf:"username"`
+       Password         string      `yaml:"password" json:"password" 
koanf:"password"`
+       File             FileConfig  `yaml:"file" json:"file" koanf:"file"`
+       Nacos            NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
+       Etcd3            Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
+       Raft             RaftConfig  `yaml:"raft" json:"raft" koanf:"raft"`
 }
 
 func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f 
*flag.FlagSet) {
        f.StringVar(&cfg.Type, prefix+".type", "file", "The registry type.")
+       f.StringVar(&cfg.Username, prefix+".username", "seata", "Username for 
authentication")
+       f.StringVar(&cfg.Password, prefix+".password", "seata", "Password for 
authentication")
        cfg.File.RegisterFlagsWithPrefix(prefix+".file", f)
        cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f)
        cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f)
+       cfg.Raft.RegisterFlagsWithPrefix(prefix+".raft", f)
 }
 
 type FileConfig struct {
@@ -90,3 +97,15 @@ func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix 
string, f *flag.FlagSet)
        f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server 
address of registry.")
        f.StringVar(&cfg.ServerAddr, prefix+".server-addr", 
"http://localhost:2379";, "The server address of registry.")
 }
+
+type RaftConfig struct {
+       MetadataMaxAgeMs            int64  `yaml:"metadata-max-age-ms" 
json:"metadata-max-age-ms" koanf:"metadata-max-age-ms"`
+       ServerAddr                  string `yaml:"serverAddr" 
json:"server-addr" koanf:"server-addr"`
+       TokenValidityInMilliseconds int64  
`yaml:"token-validity-in-milliseconds" json:"token-validity-in-milliseconds" 
koanf:"token-validity-in-milliseconds"`
+}
+
+func (cfg *RaftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) 
{
+       f.Int64Var(&cfg.MetadataMaxAgeMs, prefix+".metadata-max-age-ms", 30000, 
"Maximum age of metadata in milliseconds before refresh")
+       f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "127.0.0.1:7091", 
"The server address of raft registry")
+       f.Int64Var(&cfg.TokenValidityInMilliseconds, 
prefix+".token-validity-in-milliseconds", 1740000, "Token validity duration in 
milliseconds")
+}
diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go
index 8e7f5e38..3fb6872d 100644
--- a/pkg/discovery/init.go
+++ b/pkg/discovery/init.go
@@ -35,6 +35,8 @@ func InitRegistry(serviceConfig *ServiceConfig, 
registryConfig *RegistryConfig)
        case ETCD:
                //init etcd registry
                registryService = newEtcdRegistryService(serviceConfig, 
&registryConfig.Etcd3)
+       case RAFT:
+               registryService = NewRaftRegistryService(serviceConfig, 
registryConfig)
        case NACOS:
                //TODO: init nacos registry
        case EUREKA:
diff --git a/pkg/discovery/base.go b/pkg/discovery/metadata/constant.go
similarity index 69%
copy from pkg/discovery/base.go
copy to pkg/discovery/metadata/constant.go
index fcc0d016..a86ba903 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/metadata/constant.go
@@ -15,25 +15,22 @@
  * limitations under the License.
  */
 
-package discovery
+package metadata
+
+type ClusterRole string
 
 const (
-       FILE   string = "file"
-       NACOS  string = "nacos"
-       ETCD   string = "etcd"
-       EUREKA string = "eureka"
-       REDIS  string = "redis"
-       ZK     string = "zk"
-       CONSUL string = "consul"
-       SOFA   string = "sofa"
+       LEADER   ClusterRole = "LEADER"
+       FOLLOWER ClusterRole = "FOLLOWER"
+       LEARNER  ClusterRole = "LEARNER"
+       MEMBER   ClusterRole = "MEMBER"
 )
 
-type ServiceInstance struct {
-       Addr string
-       Port int
-}
+type StoreMode string
 
-type RegistryService interface {
-       Lookup(key string) ([]*ServiceInstance, error)
-       Close()
-}
+const (
+       FILE  StoreMode = "file"
+       DB    StoreMode = "db"
+       REDIS StoreMode = "redis"
+       RAFT  StoreMode = "raft"
+)
diff --git a/pkg/discovery/metadata/metadata.go 
b/pkg/discovery/metadata/metadata.go
new file mode 100644
index 00000000..0b088daf
--- /dev/null
+++ b/pkg/discovery/metadata/metadata.go
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+       "math/rand"
+       "sync"
+)
+
+type MetadataResponse struct {
+       Nodes     []*Node
+       StoreMode string
+       Term      int64
+}
+
+type Metadata struct {
+       leaders      sync.Map // clusterName -> sync.Map(group -> *Node)
+       clusterTerm  sync.Map // clusterName -> sync.Map(group -> int64)
+       clusterNodes sync.Map // clusterName -> sync.Map(group -> []*Node)
+       storeMode    StoreMode
+}
+
+func NewMetadata() *Metadata {
+       return &Metadata{
+               storeMode: FILE,
+       }
+}
+
+func (m *Metadata) GetLeader(clusterName string) *Node {
+       if groupMapAny, ok := m.leaders.Load(clusterName); ok {
+               if groupMap, ok := groupMapAny.(*sync.Map); ok {
+                       var nodes []*Node
+                       groupMap.Range(func(_, value any) bool {
+                               if node, ok := value.(*Node); ok {
+                                       nodes = append(nodes, node)
+                               }
+                               return true
+                       })
+                       if len(nodes) > 0 {
+                               return nodes[rand.Intn(len(nodes))]
+                       }
+               }
+       }
+       return nil
+}
+
+func (m *Metadata) GetNodes(clusterName, group string) []*Node {
+       clusterNodesAny, ok := m.clusterNodes.Load(clusterName)
+       if !ok {
+               return nil
+       }
+       clusterMap, ok := clusterNodesAny.(*sync.Map)
+       if !ok {
+               return nil
+       }
+
+       if group == "" {
+               var result []*Node
+               clusterMap.Range(func(_, value any) bool {
+                       if nodes, ok := value.([]*Node); ok {
+                               result = append(result, nodes...)
+                       }
+                       return true
+               })
+               return result
+       }
+
+       if nodesAny, ok := clusterMap.Load(group); ok {
+               if nodes, ok := nodesAny.([]*Node); ok {
+                       return nodes
+               }
+       }
+       return nil
+}
+
+func (m *Metadata) SetNodes(clusterName, group string, nodes []*Node) {
+       clusterMapAny, _ := m.clusterNodes.LoadOrStore(clusterName, &sync.Map{})
+       clusterMap := clusterMapAny.(*sync.Map)
+       clusterMap.Store(group, nodes)
+}
+
+func (m *Metadata) ContainsGroup(clusterName string) bool {
+       _, ok := m.clusterNodes.Load(clusterName)
+       return ok
+}
+
+func (m *Metadata) Groups(clusterName string) []string {
+       if clusterAny, ok := m.clusterNodes.Load(clusterName); ok {
+               if cluster, ok := clusterAny.(*sync.Map); ok {
+                       var groups []string
+                       cluster.Range(func(key, _ any) bool {
+                               if group, ok := key.(string); ok {
+                                       groups = append(groups, group)
+                               }
+                               return true
+                       })
+                       return groups
+               }
+       }
+       return nil
+}
+
+func (m *Metadata) GetClusterTerm(clusterName string) map[string]int64 {
+       if termMapAny, ok := m.clusterTerm.Load(clusterName); ok {
+               if termMap, ok := termMapAny.(*sync.Map); ok {
+                       result := make(map[string]int64)
+                       termMap.Range(func(key, value any) bool {
+                               k, ok1 := key.(string)
+                               v, ok2 := value.(int64)
+                               if ok1 && ok2 {
+                                       result[k] = v
+                               }
+                               return true
+                       })
+                       return result
+               }
+       }
+       return nil
+}
+
+func (m *Metadata) RefreshMetadata(clusterName string, response 
MetadataResponse) {
+       var nodes []*Node
+       for _, node := range response.Nodes {
+               if node.Role == LEADER {
+                       groupMapAny, _ := m.leaders.LoadOrStore(clusterName, 
&sync.Map{})
+                       groupMap := groupMapAny.(*sync.Map)
+                       groupMap.Store(node.Group, node)
+               }
+               nodes = append(nodes, node)
+       }
+
+       switch response.StoreMode {
+       case "RAFT":
+               m.storeMode = RAFT
+       default:
+               m.storeMode = FILE
+       }
+
+       if len(nodes) > 0 {
+               group := nodes[0].Group
+               m.SetNodes(clusterName, group, nodes)
+               termMapAny, _ := m.clusterTerm.LoadOrStore(clusterName, 
&sync.Map{})
+               termMap := termMapAny.(*sync.Map)
+               termMap.Store(group, response.Term)
+       }
+}
diff --git a/pkg/discovery/base.go b/pkg/discovery/metadata/node.go
similarity index 68%
copy from pkg/discovery/base.go
copy to pkg/discovery/metadata/node.go
index fcc0d016..ab931073 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/metadata/node.go
@@ -15,25 +15,25 @@
  * limitations under the License.
  */
 
-package discovery
+package metadata
 
-const (
-       FILE   string = "file"
-       NACOS  string = "nacos"
-       ETCD   string = "etcd"
-       EUREKA string = "eureka"
-       REDIS  string = "redis"
-       ZK     string = "zk"
-       CONSUL string = "consul"
-       SOFA   string = "sofa"
-)
+type Node struct {
+       Control     *Endpoint
+       Transaction *Endpoint
+       Internal    *Endpoint
 
-type ServiceInstance struct {
-       Addr string
-       Port int
+       Weight    float64
+       Healthy   bool
+       Timestamp int64
+
+       Group    string
+       Role     ClusterRole
+       Version  string
+       Metadata map[string]interface{}
 }
 
-type RegistryService interface {
-       Lookup(key string) ([]*ServiceInstance, error)
-       Close()
+type Endpoint struct {
+       Host     string
+       Protocol string
+       Port     int
 }
diff --git a/pkg/discovery/raft.go b/pkg/discovery/raft.go
new file mode 100644
index 00000000..c1e573a4
--- /dev/null
+++ b/pkg/discovery/raft.go
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "math/rand"
+       "net"
+       "net/http"
+       "net/url"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+
+       "seata.apache.org/seata-go/pkg/discovery/metadata"
+       "seata.apache.org/seata-go/pkg/util/log"
+)
+
+const (
+       controlEndpoint     = "control"
+       transactionEndpoint = "transaction"
+)
+
+type RaftRegistryService struct {
+       cfg                            *RaftConfig
+       metadata                       *metadata.Metadata
+       initAddresses                  sync.Map // clusterName -> 
[]*ServiceInstance
+       aliveNodes                     sync.Map // transactionServiceGroup -> 
[]*ServiceInstance
+       vgroupMapping                  map[string]string
+       namingserverAddress            string
+       username                       string
+       password                       string
+       jwtToken                       string
+       tokenTimestamp                 int64
+       currentTransactionServiceGroup string
+       currentTransactionClusterName  string
+       mu                             sync.RWMutex
+       stopCh                         chan struct{}
+       refreshOnce                    sync.Once
+       httpClient                     *http.Client
+       random                         *rand.Rand
+}
+
+func NewRaftRegistryService(config *ServiceConfig, raftConfig *RegistryConfig) 
*RaftRegistryService {
+       vgroupMapping := config.VgroupMapping
+
+       r := &RaftRegistryService{
+               cfg:                 &raftConfig.Raft,
+               metadata:            metadata.NewMetadata(),
+               initAddresses:       sync.Map{},
+               aliveNodes:          sync.Map{},
+               vgroupMapping:       vgroupMapping,
+               namingserverAddress: raftConfig.NamingserverAddr,
+               username:            raftConfig.Username,
+               password:            raftConfig.Password,
+               stopCh:              make(chan struct{}),
+               httpClient:          &http.Client{},
+               tokenTimestamp:      -1,
+               random:              
rand.New(rand.NewSource(time.Now().UnixNano())),
+       }
+       return r
+}
+
+func (r *RaftRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
+       clusterName := r.vgroupMapping[key]
+       if clusterName == "" {
+               return nil, fmt.Errorf("cluster doesnt exist")
+       }
+       r.currentTransactionServiceGroup = key
+       r.currentTransactionClusterName = clusterName
+
+       if !r.metadata.ContainsGroup(clusterName) {
+               if _, ok := r.loadInitAddresses(clusterName); !ok && 
r.cfg.ServerAddr != "" {
+                       addrs := strings.Split(r.cfg.ServerAddr, ",")
+                       list := make([]*ServiceInstance, 0, len(addrs))
+                       for _, addr := range addrs {
+                               h, p, err := 
net.SplitHostPort(strings.TrimSpace(addr))
+                               if err != nil {
+                                       log.Infof("invalid init server addr: 
%s, err: %v", addr, err)
+                                       continue
+                               }
+                               port, err := strconv.Atoi(p)
+                               if err != nil {
+                                       log.Errorf("invalid port: %s", p)
+                                       continue
+                               }
+                               list = append(list, &ServiceInstance{Addr: h, 
Port: port})
+                       }
+                       if len(list) == 0 {
+                               return nil, fmt.Errorf("invalid service 
group/key: %s", key)
+                       }
+                       r.initAddresses.Store(clusterName, list)
+
+                       if err := r.refreshToken(); err != nil {
+                               return nil, err
+                       }
+
+                       err := r.acquireClusterMetaData(clusterName, "")
+                       if err != nil {
+                               return nil, err
+                       }
+                       r.startQueryMetadata()
+               }
+       }
+       return r.getServiceInstances(clusterName, "")
+}
+
+func (r *RaftRegistryService) aliveLookup(transactionServiceGroup string) 
([]*ServiceInstance, error) {
+       clusterName := r.vgroupMapping[transactionServiceGroup]
+       if clusterName == "" {
+               return nil, fmt.Errorf("cluster doesnt exist")
+       }
+       leader := r.metadata.GetLeader(clusterName)
+       if leader != nil {
+               endpoint, err := r.selectEndpoint(transactionEndpoint, leader)
+               if err != nil {
+                       return nil, err
+               }
+               return []*ServiceInstance{endpoint}, nil
+       }
+       return r.getServiceInstances(clusterName, "")
+}
+
+func (r *RaftRegistryService) getServiceInstances(clusterName, group string) 
([]*ServiceInstance, error) {
+       nodes := r.metadata.GetNodes(clusterName, group)
+       if len(nodes) > 0 {
+               instances := make([]*ServiceInstance, 0, len(nodes))
+               for _, n := range nodes {
+                       inst, _ := r.selectEndpoint(transactionEndpoint, n)
+                       if inst != nil {
+                               instances = append(instances, inst)
+                       }
+               }
+               return instances, nil
+       }
+       return nil, nil
+}
+
+func (r *RaftRegistryService) RefreshAliveLookup(transactionServiceGroup 
string, aliveAddress []*ServiceInstance) ([]*ServiceInstance, error) {
+       clusterName := r.vgroupMapping[transactionServiceGroup]
+       if clusterName == "" {
+               return nil, fmt.Errorf("cluster not found for serviceGroup=%s", 
transactionServiceGroup)
+       }
+
+       leader := r.metadata.GetLeader(clusterName)
+       if leader == nil {
+               return nil, fmt.Errorf("leader not found for cluster=%s", 
clusterName)
+       }
+
+       leaderEndpoint, err := r.selectEndpoint(transactionEndpoint, leader)
+       if err != nil {
+               return nil, err
+       }
+
+       var result []*ServiceInstance
+       for _, addr := range aliveAddress {
+               if addr.Port != leaderEndpoint.Port || addr.Addr != 
leaderEndpoint.Addr {
+                       result = append(result, addr)
+               }
+       }
+
+       r.aliveNodes.Store(transactionServiceGroup, result)
+       return result, nil
+}
+
+func (r *RaftRegistryService) Close() {
+       select {
+       case <-r.stopCh:
+       default:
+               close(r.stopCh)
+       }
+}
+
+func (r *RaftRegistryService) selectEndpoint(t string, n *metadata.Node) 
(*ServiceInstance, error) {
+       switch t {
+       case controlEndpoint:
+               return &ServiceInstance{
+                       Addr: n.Control.Host,
+                       Port: n.Control.Port,
+               }, nil
+       case transactionEndpoint:
+               return &ServiceInstance{
+                       Addr: n.Transaction.Host,
+                       Port: n.Transaction.Port,
+               }, nil
+       default:
+               return nil, fmt.Errorf("SelectEndpoint is not support type: 
%s", t)
+       }
+}
+
+func (r *RaftRegistryService) startQueryMetadata() {
+       r.refreshOnce.Do(func() {
+               go func() {
+                       metadataMaxAge := int64(30000)
+                       if r.cfg.MetadataMaxAgeMs > 0 {
+                               metadataMaxAge = r.cfg.MetadataMaxAgeMs
+                       }
+                       currentTime := time.Now().UnixMilli()
+                       ticker := time.NewTicker(5 * time.Second)
+                       defer ticker.Stop()
+
+                       for {
+                               select {
+                               case <-r.stopCh:
+                                       log.Info("raft registry service 
stopped")
+                                       return
+                               case <-ticker.C:
+                                       func() {
+                                               shouldFetch := 
time.Now().UnixMilli()-currentTime > metadataMaxAge
+                                               if !shouldFetch {
+                                                       ok, err := r.watch()
+                                                       if err != nil {
+                                                               
log.Errorf("watch error: %v", err)
+                                                               shouldFetch = 
true
+                                                       } else {
+                                                               shouldFetch = ok
+                                                       }
+                                               }
+
+                                               if shouldFetch {
+                                                       clusterName := 
r.currentTransactionClusterName
+                                                       groups := 
r.metadata.Groups(clusterName)
+                                                       if len(groups) == 0 {
+                                                               groups = 
append(groups, "")
+                                                       }
+                                                       for _, g := range 
groups {
+                                                               err := 
r.acquireClusterMetaData(clusterName, g)
+                                                               if err != nil {
+                                                                       
log.Errorf("acquire cluster metadata failed: cluster=%s group=%s err=%v", 
clusterName, g, err)
+                                                               }
+                                                       }
+
+                                                       currentTime = 
time.Now().UnixMilli()
+                                               }
+                                       }()
+                               }
+                       }
+               }()
+       })
+}
+
+func (r *RaftRegistryService) watch() (bool, error) {
+       header := map[string]string{
+               "Content-Type": "application/x-www-form-urlencoded",
+       }
+       clusterNames := r.clusterNamesFromInit()
+       for _, clusterName := range clusterNames {
+               groupTerms := r.metadata.GetClusterTerm(clusterName)
+               if groupTerms == nil {
+                       groupTerms = map[string]int64{"": 0}
+               }
+               for group := range groupTerms {
+                       tcAddress, err := r.queryHttpAddress(clusterName, group)
+                       if err != nil {
+                               log.Infof("no tc address to watch for cluster 
%s: %v", clusterName, err)
+                               continue
+                       }
+                       if r.isTokenExpired() {
+                               if err = r.refreshToken(); err != nil {
+                                       return false, err
+                               }
+                       }
+                       if r.jwtToken != "" {
+                               header["Authorization"] = r.jwtToken
+                       }
+
+                       form := url.Values{}
+                       for k, v := range groupTerms {
+                               form.Set(k, strconv.FormatInt(v, 10))
+                       }
+
+                       endpoint := fmt.Sprintf("http://%s/metadata/v1/watch";, 
tcAddress)
+                       req, err := http.NewRequest("POST", endpoint, 
strings.NewReader(form.Encode()))
+                       if err != nil {
+                               return false, err
+                       }
+                       for hk, hv := range header {
+                               req.Header.Set(hk, hv)
+                       }
+                       resp, err := r.doRequest(req, 30*time.Second)
+                       if err != nil {
+                               log.Errorf("watch cluster node: %s, fail: %v", 
tcAddress, err)
+                               return false, err
+                       }
+                       defer resp.Body.Close()
+                       if resp.StatusCode == http.StatusUnauthorized {
+                               return false, errors.New("authentication 
failed: missing username/password")
+                       }
+                       return resp.StatusCode == http.StatusOK, nil
+               }
+       }
+       return false, nil
+}
+
+func (r *RaftRegistryService) acquireClusterMetaData(clusterName, group 
string) error {
+       tcAddress, err := r.queryHttpAddress(clusterName, group)
+       if err != nil {
+               return err
+       }
+       headers := map[string]string{
+               "Content-Type": "application/x-www-form-urlencoded",
+       }
+       if r.isTokenExpired() {
+               if err = r.refreshToken(); err != nil {
+                       return err
+               }
+       }
+       if r.jwtToken != "" {
+               headers["Authorization"] = r.jwtToken
+       }
+       u := fmt.Sprintf("http://%s/metadata/v1/cluster";, tcAddress)
+       req, err := http.NewRequest("GET", u, nil)
+       if err != nil {
+               return err
+       }
+       q := req.URL.Query()
+       q.Add("group", group)
+       req.URL.RawQuery = q.Encode()
+       for hk, hv := range headers {
+               req.Header.Set(hk, hv)
+       }
+
+       resp, err := r.doRequest(req, 1*time.Second)
+       if err != nil {
+               return fmt.Errorf("http get cluster failed: %w", err)
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode == http.StatusOK {
+               body, _ := io.ReadAll(resp.Body)
+               var mr metadata.MetadataResponse
+               if err = json.Unmarshal(body, &mr); err != nil {
+                       return fmt.Errorf("unmarshal metadataResponse failed: 
%w", err)
+               }
+               r.metadata.RefreshMetadata(clusterName, mr)
+               return nil
+       } else if resp.StatusCode == http.StatusUnauthorized {
+               if err = r.refreshToken(); err != nil {
+                       return err
+               }
+               return fmt.Errorf("authentication failed! you should configure 
the correct username and password")
+       }
+       return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+}
+
+/* -------------------- Token management -------------------- */
+
+func (r *RaftRegistryService) isTokenExpired() bool {
+       r.mu.RLock()
+       ts := r.tokenTimestamp
+       r.mu.RUnlock()
+       if ts == -1 {
+               return true
+       }
+       valid := int64(29 * 60 * 1000)
+       if r.cfg.TokenValidityInMilliseconds > 0 {
+               valid = r.cfg.TokenValidityInMilliseconds
+       }
+       expireTime := ts + valid
+       return time.Now().UnixMilli() >= expireTime
+}
+
+func (r *RaftRegistryService) refreshToken() error {
+       address := r.namingserverAddress
+       body, _ := json.Marshal(map[string]string{
+               "username": r.username,
+               "password": r.password,
+       })
+       req, err := http.NewRequest("POST", 
fmt.Sprintf("http://%s/api/v1/auth/login";, address), bytes.NewReader(body))
+       if err != nil {
+               return err
+       }
+       req.Header.Set("Content-Type", "application/json")
+       resp, err := r.doRequest(req, 1*time.Second)
+       if err != nil {
+               return fmt.Errorf("refresh token failed: %w", err)
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               return errors.New("authentication failed when refresh token")
+       }
+       respBody, _ := io.ReadAll(resp.Body)
+       var node map[string]interface{}
+       if err = json.Unmarshal(respBody, &node); err != nil {
+               return fmt.Errorf("invalid auth response: %w", err)
+       }
+       code, _ := node["code"].(string)
+       if code != "" && code != "200" {
+               return errors.New("authentication failed! you should configure 
the correct username and password")
+       }
+       dataVal := node["data"].(string)
+       r.mu.Lock()
+       r.jwtToken = dataVal
+       r.tokenTimestamp = time.Now().UnixMilli()
+       r.mu.Unlock()
+       return nil
+}
+
+func (r *RaftRegistryService) queryHttpAddress(clusterName, group string) 
(string, error) {
+       var addressList []string
+
+       nodes := r.metadata.GetNodes(clusterName, group)
+
+       formatAddr := func(addr string, port int) string {
+               return fmt.Sprintf("%s:%d", addr, port)
+       }
+
+       appendControlAddr := func(node *metadata.Node) error {
+               endpoint, err := r.selectEndpoint(controlEndpoint, node)
+               if err != nil {
+                       return fmt.Errorf("failed to select control endpoint: 
%v", err)
+               }
+               addressList = append(addressList, formatAddr(endpoint.Addr, 
endpoint.Port))
+               return nil
+       }
+
+       if len(nodes) > 0 {
+               currentServiceGroup := r.currentTransactionServiceGroup
+               if aliveAny, ok := r.aliveNodes.Load(currentServiceGroup); ok {
+                       aliveNodes, ok := aliveAny.([]*ServiceInstance)
+                       if !ok {
+                               log.Errorf("invalid type for alive nodes, 
expected []*ServiceInstance")
+                               return "", fmt.Errorf("invalid alive nodes type 
for service group %s", currentServiceGroup)
+                       }
+
+                       if len(aliveNodes) == 0 {
+                               for _, node := range nodes {
+                                       if err := appendControlAddr(node); err 
!= nil {
+                                               return "", err
+                                       }
+                               }
+                       } else {
+                               m := make(map[string]*metadata.Node)
+                               for _, node := range nodes {
+                                       inetSocketAddress, _ := 
r.selectEndpoint(transactionEndpoint, node)
+                                       m[formatAddr(inetSocketAddress.Addr, 
inetSocketAddress.Port)] = node
+                               }
+
+                               for _, alive := range aliveNodes {
+                                       addrKey := formatAddr(alive.Addr, 
alive.Port)
+                                       node, exists := m[addrKey]
+                                       if !exists {
+                                               continue
+                                       }
+
+                                       contrEndpoint, err := 
r.selectEndpoint(controlEndpoint, node)
+                                       if err != nil {
+                                               log.Errorf("failed to select 
control endpoint for node: %v", err)
+                                               continue
+                                       }
+                                       addressList = append(addressList, 
formatAddr(contrEndpoint.Addr, contrEndpoint.Port))
+                               }
+                       }
+               } else {
+                       for _, node := range nodes {
+                               if err := appendControlAddr(node); err != nil {
+                                       return "", err
+                               }
+                       }
+               }
+       } else {
+               if initAny, ok := r.initAddresses.Load(clusterName); ok {
+                       for _, inetSocketAddress := range 
initAny.([]*ServiceInstance) {
+                               addressList = append(addressList, 
formatAddr(inetSocketAddress.Addr, inetSocketAddress.Port))
+                       }
+               }
+       }
+
+       if len(addressList) == 0 {
+               return "", fmt.Errorf("no available address for cluster=%s 
group=%s", clusterName, group)
+       }
+       return addressList[r.random.Intn(len(addressList))], nil
+}
+
+func (r *RaftRegistryService) clusterNamesFromInit() []string {
+       var names []string
+       r.initAddresses.Range(func(key, _ any) bool {
+               if k, ok := key.(string); ok {
+                       names = append(names, k)
+               }
+               return true
+       })
+       return names
+}
+
+func (r *RaftRegistryService) doRequest(req *http.Request, timeout 
time.Duration) (*http.Response, error) {
+       ctx, cancel := context.WithTimeout(req.Context(), timeout)
+       defer cancel()
+
+       req = req.WithContext(ctx)
+       return r.httpClient.Do(req)
+}
+
+func (r *RaftRegistryService) loadInitAddresses(clusterName string) 
([]*ServiceInstance, bool) {
+       if val, ok := r.initAddresses.Load(clusterName); ok {
+               if list, ok := val.([]*ServiceInstance); ok {
+                       return list, true
+               }
+       }
+       return nil, false
+}
diff --git a/pkg/discovery/raft_test.go b/pkg/discovery/raft_test.go
new file mode 100644
index 00000000..206e04ad
--- /dev/null
+++ b/pkg/discovery/raft_test.go
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+       "encoding/json"
+       "github.com/stretchr/testify/assert"
+       "math/rand"
+       "net/http"
+       "net/http/httptest"
+       "seata.apache.org/seata-go/pkg/discovery/metadata"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+)
+
+var (
+       serviceConfig = &ServiceConfig{VgroupMapping: 
map[string]string{"default_tx_group": "default"}}
+       raftConfig    = RaftConfig{
+               MetadataMaxAgeMs:            int64(30000),
+               ServerAddr:                  "",
+               TokenValidityInMilliseconds: int64(1740000),
+       }
+       registryConfig = &RegistryConfig{
+               Type:             "raft",
+               NamingserverAddr: "",
+               Username:         "seata",
+               Password:         "seata",
+               Raft:             raftConfig,
+       }
+)
+
+func TestMetadataHandler(t *testing.T) {
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               t.Logf("Received request: %s %s", r.Method, r.URL.Path)
+
+               switch r.URL.Path {
+               case "/api/v1/auth/login":
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(map[string]string{
+                               "code": "200",
+                               "data": "mock-jwt-token",
+                       })
+
+               case "/metadata/v1/cluster":
+                       if r.Header.Get("Authorization") != "mock-jwt-token" {
+                               t.Errorf("Missing valid token: %s", 
r.Header.Get("Authorization"))
+                               w.WriteHeader(http.StatusUnauthorized)
+                               return
+                       }
+
+                       resp := metadata.MetadataResponse{
+                               Term: 1,
+                               Nodes: []*metadata.Node{
+                                       {
+                                               Control:     
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Transaction: 
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Internal:    
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+                                               Group:       "default",
+                                               Role:        metadata.LEADER,
+                                               Version:     "2.5.0",
+                                               Metadata:    
map[string]interface{}{},
+                                       },
+                                       {
+                                               Control:     
&metadata.Endpoint{Host: "127.0.0.1", Port: 7002},
+                                               Transaction: 
&metadata.Endpoint{Host: "127.0.0.1", Port: 7002},
+                                               Internal:    
&metadata.Endpoint{Host: "127.0.0.1", Port: 9002},
+                                               Group:       "default",
+                                               Role:        metadata.FOLLOWER,
+                                               Version:     "2.5.0",
+                                               Metadata:    
map[string]interface{}{},
+                                       },
+                                       {
+                                               Control:     
&metadata.Endpoint{Host: "127.0.0.1", Port: 7003},
+                                               Transaction: 
&metadata.Endpoint{Host: "127.0.0.1", Port: 7003},
+                                               Internal:    
&metadata.Endpoint{Host: "127.0.0.1", Port: 9003},
+                                               Group:       "default",
+                                               Role:        metadata.FOLLOWER,
+                                               Version:     "2.5.0",
+                                               Metadata:    
map[string]interface{}{},
+                                       },
+                               },
+                       }
+
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       if err := json.NewEncoder(w).Encode(resp); err != nil {
+                               t.Errorf("Failed to encode response: %v", err)
+                       }
+
+               default:
+                       t.Errorf("Unknown path: %s", r.URL.Path)
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer mockServer.Close()
+       mockAddr := mockServer.Listener.Addr().String()
+       registryConfig.Raft.ServerAddr = mockAddr
+       registryConfig.NamingserverAddr = mockAddr
+       service := NewRaftRegistryService(serviceConfig, registryConfig)
+       instances, err := service.Lookup("default_tx_group")
+       if err != nil {
+               t.Errorf("Failed to lookup service: %v", err)
+       }
+       serviceInstances := []*ServiceInstance{
+               {
+                       Addr: "127.0.0.1",
+                       Port: 7001,
+               }, {
+                       Addr: "127.0.0.1",
+                       Port: 7002,
+               }, {
+                       Addr: "127.0.0.1",
+                       Port: 7003,
+               },
+       }
+       assert.Equal(t, serviceInstances, instances)
+}
+
+func TestMultiClusterWatch(t *testing.T) {
+       clusterRequests := make(map[string]int)
+       var requestMu sync.Mutex
+
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               requestMu.Lock()
+               clusterRequests[r.URL.Path]++
+               requestMu.Unlock()
+
+               switch r.URL.Path {
+               case "/api/v1/auth/login":
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(map[string]string{
+                               "code": "200",
+                               "data": "mock-jwt-token",
+                       })
+
+               case "/metadata/v1/cluster":
+                       resp := metadata.MetadataResponse{
+                               Term: 1,
+                               Nodes: []*metadata.Node{
+                                       {
+                                               Control:     
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Transaction: 
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Internal:    
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+                                               Group:       "",
+                                               Role:        metadata.LEADER,
+                                               Version:     "2.5.0",
+                                               Metadata:    
map[string]interface{}{},
+                                       },
+                               },
+                       }
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(resp)
+
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer mockServer.Close()
+
+       multiServiceConfig := &ServiceConfig{
+               VgroupMapping: map[string]string{
+                       "tx_group_1": "cluster1",
+                       "tx_group_2": "cluster2",
+               },
+       }
+
+       mockAddr := mockServer.Listener.Addr().String()
+       multiRegistryConfig := &RegistryConfig{
+               Type:             "raft",
+               NamingserverAddr: mockAddr,
+               Username:         "seata",
+               Password:         "seata",
+               Raft: RaftConfig{
+                       MetadataMaxAgeMs:            int64(30000),
+                       ServerAddr:                  mockAddr + "," + mockAddr,
+                       TokenValidityInMilliseconds: int64(1740000),
+               },
+       }
+
+       service := NewRaftRegistryService(multiServiceConfig, 
multiRegistryConfig)
+       defer service.Close()
+
+       _, err1 := service.Lookup("tx_group_1")
+       assert.NoError(t, err1)
+
+       _, err2 := service.Lookup("tx_group_2")
+       assert.NoError(t, err2)
+
+       requestMu.Lock()
+       clusterCount := clusterRequests["/metadata/v1/cluster"]
+       requestMu.Unlock()
+
+       assert.True(t, clusterCount >= 2, "Multiple clusters should trigger 
multiple cluster metadata requests")
+}
+
+func TestConcurrentAccess(t *testing.T) {
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               switch r.URL.Path {
+               case "/api/v1/auth/login":
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(map[string]string{
+                               "code": "200",
+                               "data": "mock-jwt-token",
+                       })
+
+               case "/metadata/v1/cluster":
+                       resp := metadata.MetadataResponse{
+                               Term: 1,
+                               Nodes: []*metadata.Node{
+                                       {
+                                               Control:     
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Transaction: 
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Internal:    
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+                                               Group:       "default",
+                                               Role:        metadata.LEADER,
+                                               Version:     "2.5.0",
+                                               Metadata:    
map[string]interface{}{},
+                                       },
+                               },
+                       }
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(resp)
+
+               case "/metadata/v1/watch":
+                       w.WriteHeader(http.StatusOK)
+
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer mockServer.Close()
+
+       mockAddr := mockServer.Listener.Addr().String()
+       testRegistryConfig := &RegistryConfig{
+               Type:             "raft",
+               NamingserverAddr: mockAddr,
+               Username:         "seata",
+               Password:         "seata",
+               Raft: RaftConfig{
+                       MetadataMaxAgeMs:            int64(30000),
+                       ServerAddr:                  mockAddr,
+                       TokenValidityInMilliseconds: int64(1740000),
+               },
+       }
+
+       service := NewRaftRegistryService(serviceConfig, testRegistryConfig)
+       defer service.Close()
+
+       _, err := service.Lookup("default_tx_group")
+       assert.NoError(t, err)
+
+       var wg sync.WaitGroup
+       successCount := int32(0)
+
+       for i := 0; i < 20; i++ {
+               wg.Add(1)
+               go func(id int) {
+                       defer wg.Done()
+
+                       _, err := service.Lookup("default_tx_group")
+                       if err == nil {
+                               atomic.AddInt32(&successCount, 1)
+                       }
+               }(i)
+       }
+
+       wg.Wait()
+
+       assert.True(t, atomic.LoadInt32(&successCount) > 0, "Some concurrent 
lookups should succeed")
+}
+
+func TestWatchErrorRecovery(t *testing.T) {
+       service := &RaftRegistryService{
+               metadata:                      metadata.NewMetadata(),
+               vgroupMapping:                 map[string]string{"test_group": 
"test_cluster"},
+               currentTransactionClusterName: "test_cluster",
+               random:                        
rand.New(rand.NewSource(time.Now().UnixNano())),
+               initAddresses:                 sync.Map{},
+               cfg:                           
&RaftConfig{TokenValidityInMilliseconds: 1740000},
+               tokenTimestamp:                time.Now().UnixMilli(),
+               httpClient:                    &http.Client{},
+       }
+
+       service.metadata.RefreshMetadata("test_cluster", 
metadata.MetadataResponse{
+               Term: 1,
+               Nodes: []*metadata.Node{
+                       {
+                               Control:     &metadata.Endpoint{Host: 
"127.0.0.1", Port: 7001},
+                               Transaction: &metadata.Endpoint{Host: 
"127.0.0.1", Port: 8001},
+                               Internal:    &metadata.Endpoint{Host: 
"127.0.0.1", Port: 9001},
+                               Group:       "",
+                               Role:        metadata.LEADER,
+                               Version:     "2.5.0",
+                               Metadata:    map[string]interface{}{},
+                       },
+               },
+       })
+
+       service.initAddresses.Store("test_cluster", []*ServiceInstance{
+               {Addr: "127.0.0.1", Port: 7001},
+       })
+
+       result, err := service.watch()
+       assert.Error(t, err)
+       assert.False(t, result)
+
+       address, err := service.queryHttpAddress("test_cluster", "")
+       assert.NoError(t, err)
+       assert.NotEmpty(t, address)
+}
+
+func TestRefreshAliveLookup(t *testing.T) {
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               switch r.URL.Path {
+               case "/api/v1/auth/login":
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(map[string]string{
+                               "code": "200",
+                               "data": "mock-jwt-token",
+                       })
+
+               case "/metadata/v1/cluster":
+                       resp := metadata.MetadataResponse{
+                               Term: 1,
+                               Nodes: []*metadata.Node{
+                                       {
+                                               Control:     
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+                                               Transaction: 
&metadata.Endpoint{Host: "127.0.0.1", Port: 8001},
+                                               Internal:    
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+                                               Group:       "default",
+                                               Role:        metadata.LEADER,
+                                               Version:     "2.5.0",
+                                               Metadata:    
map[string]interface{}{},
+                                       },
+                               },
+                       }
+                       w.Header().Set("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(resp)
+
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer mockServer.Close()
+
+       mockAddr := mockServer.Listener.Addr().String()
+       testRegistryConfig := &RegistryConfig{
+               Type:             "raft",
+               NamingserverAddr: mockAddr,
+               Username:         "seata",
+               Password:         "seata",
+               Raft: RaftConfig{
+                       MetadataMaxAgeMs:            int64(30000),
+                       ServerAddr:                  mockAddr,
+                       TokenValidityInMilliseconds: int64(1740000),
+               },
+       }
+
+       service := NewRaftRegistryService(serviceConfig, testRegistryConfig)
+       defer service.Close()
+
+       _, err := service.Lookup("default_tx_group")
+       assert.NoError(t, err)
+
+       aliveInstances := []*ServiceInstance{
+               {Addr: "127.0.0.1", Port: 8001},
+               {Addr: "127.0.0.1", Port: 8002},
+               {Addr: "127.0.0.1", Port: 8003},
+       }
+
+       result, err := service.RefreshAliveLookup("default_tx_group", 
aliveInstances)
+       assert.NoError(t, err)
+
+       expectedResult := []*ServiceInstance{
+               {Addr: "127.0.0.1", Port: 8002},
+               {Addr: "127.0.0.1", Port: 8003},
+       }
+       assert.Equal(t, expectedResult, result)
+
+       emptyResult, err := service.RefreshAliveLookup("default_tx_group", 
[]*ServiceInstance{})
+       assert.NoError(t, err)
+       assert.Empty(t, emptyResult)
+
+       _, err = service.RefreshAliveLookup("non_existent_group", 
aliveInstances)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "cluster not found")
+}
+
+func TestTypeAssertionSafety(t *testing.T) {
+       service := &RaftRegistryService{
+               aliveNodes:    sync.Map{},
+               vgroupMapping: map[string]string{"test_group": "test_cluster"},
+               metadata:      metadata.NewMetadata(),
+               random:        rand.New(rand.NewSource(time.Now().UnixNano())),
+       }
+
+       service.metadata.RefreshMetadata("test_cluster", 
metadata.MetadataResponse{
+               Term: 1,
+               Nodes: []*metadata.Node{
+                       {
+                               Control:     &metadata.Endpoint{Host: 
"127.0.0.1", Port: 7001},
+                               Transaction: &metadata.Endpoint{Host: 
"127.0.0.1", Port: 8001},
+                               Internal:    &metadata.Endpoint{Host: 
"127.0.0.1", Port: 9001},
+                               Group:       "",
+                               Role:        metadata.LEADER,
+                               Version:     "2.5.0",
+                               Metadata:    map[string]interface{}{},
+                       },
+               },
+       })
+
+       service.aliveNodes.Store("test_group", "invalid_type_string")
+       service.currentTransactionServiceGroup = "test_group"
+
+       _, err := service.queryHttpAddress("test_cluster", "")
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "invalid alive nodes type")
+
+       service.aliveNodes.Store("test_group", []*ServiceInstance{
+               {Addr: "127.0.0.1", Port: 8001},
+       })
+
+       address, err := service.queryHttpAddress("test_cluster", "")
+       assert.NoError(t, err)
+       assert.NotEmpty(t, address)
+}
diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml
index fa8a7f82..6edb83e0 100644
--- a/testdata/conf/seatago.yml
+++ b/testdata/conf/seatago.yml
@@ -120,7 +120,10 @@ seata:
       data-id: seata.properties
   # Registration Center
   registry:
-    type: file
+    type: raft
+    namingserver-addr: 127.0.0.1:8081
+    username: seata
+    password: seata
     file:
       name: seatago.yml
     nacos:
@@ -136,6 +139,10 @@ seata:
     etcd3:
       cluster: "default"
       server-addr: "http://localhost:2379";
+    raft:
+      metadata-max-age-ms: 30000
+      server-ddr:
+      token-validity-in-milliseconds: 1740000
   log:
     exception-rate: 100
   tcc:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to