This is an automated email from the ASF dual-hosted git repository.
goodboycoder pushed a commit to branch feature/support-raft
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/support-raft by this
push:
new dae0737a OSPP 2025 Go&Java Version alignment / support-raft (#923)
dae0737a is described below
commit dae0737a2b93ba18615eaa46f406a61f208b9e37
Author: flypiggy <[email protected]>
AuthorDate: Thu Oct 30 22:59:23 2025 +0800
OSPP 2025 Go&Java Version alignment / support-raft (#923)
upd-raft
---
pkg/discovery/base.go | 1 +
pkg/discovery/config.go | 27 +-
pkg/discovery/init.go | 2 +
pkg/discovery/{base.go => metadata/constant.go} | 31 +-
pkg/discovery/metadata/metadata.go | 161 ++++++++
pkg/discovery/{base.go => metadata/node.go} | 34 +-
pkg/discovery/raft.go | 522 ++++++++++++++++++++++++
pkg/discovery/raft_test.go | 450 ++++++++++++++++++++
testdata/conf/seatago.yml | 9 +-
9 files changed, 1198 insertions(+), 39 deletions(-)
diff --git a/pkg/discovery/base.go b/pkg/discovery/base.go
index fcc0d016..f3b5a7a5 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/base.go
@@ -26,6 +26,7 @@ const (
ZK string = "zk"
CONSUL string = "consul"
SOFA string = "sofa"
+ RAFT string = "raft"
)
type ServiceInstance struct {
diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go
index d8a78874..e80c046a 100644
--- a/pkg/discovery/config.go
+++ b/pkg/discovery/config.go
@@ -38,17 +38,24 @@ func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix
string, f *flag.FlagSet
}
type RegistryConfig struct {
- Type string `yaml:"type" json:"type" koanf:"type"`
- File FileConfig `yaml:"file" json:"file" koanf:"file"`
- Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
- Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
+ Type string `yaml:"type" json:"type" koanf:"type"`
+ NamingserverAddr string `yaml:"namingserver-addr"
json:"namingserver-addr" koanf:"namingserver-addr"`
+ Username string `yaml:"username" json:"username"
koanf:"username"`
+ Password string `yaml:"password" json:"password"
koanf:"password"`
+ File FileConfig `yaml:"file" json:"file" koanf:"file"`
+ Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
+ Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
+ Raft RaftConfig `yaml:"raft" json:"raft" koanf:"raft"`
}
func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f
*flag.FlagSet) {
f.StringVar(&cfg.Type, prefix+".type", "file", "The registry type.")
+ f.StringVar(&cfg.Username, prefix+".username", "seata", "Username for
authentication")
+ f.StringVar(&cfg.Password, prefix+".password", "seata", "Password for
authentication")
cfg.File.RegisterFlagsWithPrefix(prefix+".file", f)
cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f)
cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f)
+ cfg.Raft.RegisterFlagsWithPrefix(prefix+".raft", f)
}
type FileConfig struct {
@@ -90,3 +97,15 @@ func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix
string, f *flag.FlagSet)
f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server
address of registry.")
f.StringVar(&cfg.ServerAddr, prefix+".server-addr",
"http://localhost:2379", "The server address of registry.")
}
+
+type RaftConfig struct {
+ MetadataMaxAgeMs int64 `yaml:"metadata-max-age-ms"
json:"metadata-max-age-ms" koanf:"metadata-max-age-ms"`
+ ServerAddr string `yaml:"serverAddr"
json:"server-addr" koanf:"server-addr"`
+ TokenValidityInMilliseconds int64
`yaml:"token-validity-in-milliseconds" json:"token-validity-in-milliseconds"
koanf:"token-validity-in-milliseconds"`
+}
+
+func (cfg *RaftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
{
+ f.Int64Var(&cfg.MetadataMaxAgeMs, prefix+".metadata-max-age-ms", 30000,
"Maximum age of metadata in milliseconds before refresh")
+ f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "127.0.0.1:7091",
"The server address of raft registry")
+ f.Int64Var(&cfg.TokenValidityInMilliseconds,
prefix+".token-validity-in-milliseconds", 1740000, "Token validity duration in
milliseconds")
+}
diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go
index 8e7f5e38..3fb6872d 100644
--- a/pkg/discovery/init.go
+++ b/pkg/discovery/init.go
@@ -35,6 +35,8 @@ func InitRegistry(serviceConfig *ServiceConfig,
registryConfig *RegistryConfig)
case ETCD:
//init etcd registry
registryService = newEtcdRegistryService(serviceConfig,
®istryConfig.Etcd3)
+ case RAFT:
+ registryService = NewRaftRegistryService(serviceConfig,
registryConfig)
case NACOS:
//TODO: init nacos registry
case EUREKA:
diff --git a/pkg/discovery/base.go b/pkg/discovery/metadata/constant.go
similarity index 69%
copy from pkg/discovery/base.go
copy to pkg/discovery/metadata/constant.go
index fcc0d016..a86ba903 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/metadata/constant.go
@@ -15,25 +15,22 @@
* limitations under the License.
*/
-package discovery
+package metadata
+
+type ClusterRole string
const (
- FILE string = "file"
- NACOS string = "nacos"
- ETCD string = "etcd"
- EUREKA string = "eureka"
- REDIS string = "redis"
- ZK string = "zk"
- CONSUL string = "consul"
- SOFA string = "sofa"
+ LEADER ClusterRole = "LEADER"
+ FOLLOWER ClusterRole = "FOLLOWER"
+ LEARNER ClusterRole = "LEARNER"
+ MEMBER ClusterRole = "MEMBER"
)
-type ServiceInstance struct {
- Addr string
- Port int
-}
+type StoreMode string
-type RegistryService interface {
- Lookup(key string) ([]*ServiceInstance, error)
- Close()
-}
+const (
+ FILE StoreMode = "file"
+ DB StoreMode = "db"
+ REDIS StoreMode = "redis"
+ RAFT StoreMode = "raft"
+)
diff --git a/pkg/discovery/metadata/metadata.go
b/pkg/discovery/metadata/metadata.go
new file mode 100644
index 00000000..0b088daf
--- /dev/null
+++ b/pkg/discovery/metadata/metadata.go
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+ "math/rand"
+ "sync"
+)
+
+type MetadataResponse struct {
+ Nodes []*Node
+ StoreMode string
+ Term int64
+}
+
+type Metadata struct {
+ leaders sync.Map // clusterName -> sync.Map(group -> *Node)
+ clusterTerm sync.Map // clusterName -> sync.Map(group -> int64)
+ clusterNodes sync.Map // clusterName -> sync.Map(group -> []*Node)
+ storeMode StoreMode
+}
+
+func NewMetadata() *Metadata {
+ return &Metadata{
+ storeMode: FILE,
+ }
+}
+
+func (m *Metadata) GetLeader(clusterName string) *Node {
+ if groupMapAny, ok := m.leaders.Load(clusterName); ok {
+ if groupMap, ok := groupMapAny.(*sync.Map); ok {
+ var nodes []*Node
+ groupMap.Range(func(_, value any) bool {
+ if node, ok := value.(*Node); ok {
+ nodes = append(nodes, node)
+ }
+ return true
+ })
+ if len(nodes) > 0 {
+ return nodes[rand.Intn(len(nodes))]
+ }
+ }
+ }
+ return nil
+}
+
+func (m *Metadata) GetNodes(clusterName, group string) []*Node {
+ clusterNodesAny, ok := m.clusterNodes.Load(clusterName)
+ if !ok {
+ return nil
+ }
+ clusterMap, ok := clusterNodesAny.(*sync.Map)
+ if !ok {
+ return nil
+ }
+
+ if group == "" {
+ var result []*Node
+ clusterMap.Range(func(_, value any) bool {
+ if nodes, ok := value.([]*Node); ok {
+ result = append(result, nodes...)
+ }
+ return true
+ })
+ return result
+ }
+
+ if nodesAny, ok := clusterMap.Load(group); ok {
+ if nodes, ok := nodesAny.([]*Node); ok {
+ return nodes
+ }
+ }
+ return nil
+}
+
+func (m *Metadata) SetNodes(clusterName, group string, nodes []*Node) {
+ clusterMapAny, _ := m.clusterNodes.LoadOrStore(clusterName, &sync.Map{})
+ clusterMap := clusterMapAny.(*sync.Map)
+ clusterMap.Store(group, nodes)
+}
+
+func (m *Metadata) ContainsGroup(clusterName string) bool {
+ _, ok := m.clusterNodes.Load(clusterName)
+ return ok
+}
+
+func (m *Metadata) Groups(clusterName string) []string {
+ if clusterAny, ok := m.clusterNodes.Load(clusterName); ok {
+ if cluster, ok := clusterAny.(*sync.Map); ok {
+ var groups []string
+ cluster.Range(func(key, _ any) bool {
+ if group, ok := key.(string); ok {
+ groups = append(groups, group)
+ }
+ return true
+ })
+ return groups
+ }
+ }
+ return nil
+}
+
+func (m *Metadata) GetClusterTerm(clusterName string) map[string]int64 {
+ if termMapAny, ok := m.clusterTerm.Load(clusterName); ok {
+ if termMap, ok := termMapAny.(*sync.Map); ok {
+ result := make(map[string]int64)
+ termMap.Range(func(key, value any) bool {
+ k, ok1 := key.(string)
+ v, ok2 := value.(int64)
+ if ok1 && ok2 {
+ result[k] = v
+ }
+ return true
+ })
+ return result
+ }
+ }
+ return nil
+}
+
+func (m *Metadata) RefreshMetadata(clusterName string, response
MetadataResponse) {
+ var nodes []*Node
+ for _, node := range response.Nodes {
+ if node.Role == LEADER {
+ groupMapAny, _ := m.leaders.LoadOrStore(clusterName,
&sync.Map{})
+ groupMap := groupMapAny.(*sync.Map)
+ groupMap.Store(node.Group, node)
+ }
+ nodes = append(nodes, node)
+ }
+
+ switch response.StoreMode {
+ case "RAFT":
+ m.storeMode = RAFT
+ default:
+ m.storeMode = FILE
+ }
+
+ if len(nodes) > 0 {
+ group := nodes[0].Group
+ m.SetNodes(clusterName, group, nodes)
+ termMapAny, _ := m.clusterTerm.LoadOrStore(clusterName,
&sync.Map{})
+ termMap := termMapAny.(*sync.Map)
+ termMap.Store(group, response.Term)
+ }
+}
diff --git a/pkg/discovery/base.go b/pkg/discovery/metadata/node.go
similarity index 68%
copy from pkg/discovery/base.go
copy to pkg/discovery/metadata/node.go
index fcc0d016..ab931073 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/metadata/node.go
@@ -15,25 +15,25 @@
* limitations under the License.
*/
-package discovery
+package metadata
-const (
- FILE string = "file"
- NACOS string = "nacos"
- ETCD string = "etcd"
- EUREKA string = "eureka"
- REDIS string = "redis"
- ZK string = "zk"
- CONSUL string = "consul"
- SOFA string = "sofa"
-)
+type Node struct {
+ Control *Endpoint
+ Transaction *Endpoint
+ Internal *Endpoint
-type ServiceInstance struct {
- Addr string
- Port int
+ Weight float64
+ Healthy bool
+ Timestamp int64
+
+ Group string
+ Role ClusterRole
+ Version string
+ Metadata map[string]interface{}
}
-type RegistryService interface {
- Lookup(key string) ([]*ServiceInstance, error)
- Close()
+type Endpoint struct {
+ Host string
+ Protocol string
+ Port int
}
diff --git a/pkg/discovery/raft.go b/pkg/discovery/raft.go
new file mode 100644
index 00000000..c1e573a4
--- /dev/null
+++ b/pkg/discovery/raft.go
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "seata.apache.org/seata-go/pkg/discovery/metadata"
+ "seata.apache.org/seata-go/pkg/util/log"
+)
+
+const (
+ controlEndpoint = "control"
+ transactionEndpoint = "transaction"
+)
+
+type RaftRegistryService struct {
+ cfg *RaftConfig
+ metadata *metadata.Metadata
+ initAddresses sync.Map // clusterName ->
[]*ServiceInstance
+ aliveNodes sync.Map // transactionServiceGroup ->
[]*ServiceInstance
+ vgroupMapping map[string]string
+ namingserverAddress string
+ username string
+ password string
+ jwtToken string
+ tokenTimestamp int64
+ currentTransactionServiceGroup string
+ currentTransactionClusterName string
+ mu sync.RWMutex
+ stopCh chan struct{}
+ refreshOnce sync.Once
+ httpClient *http.Client
+ random *rand.Rand
+}
+
+func NewRaftRegistryService(config *ServiceConfig, raftConfig *RegistryConfig)
*RaftRegistryService {
+ vgroupMapping := config.VgroupMapping
+
+ r := &RaftRegistryService{
+ cfg: &raftConfig.Raft,
+ metadata: metadata.NewMetadata(),
+ initAddresses: sync.Map{},
+ aliveNodes: sync.Map{},
+ vgroupMapping: vgroupMapping,
+ namingserverAddress: raftConfig.NamingserverAddr,
+ username: raftConfig.Username,
+ password: raftConfig.Password,
+ stopCh: make(chan struct{}),
+ httpClient: &http.Client{},
+ tokenTimestamp: -1,
+ random:
rand.New(rand.NewSource(time.Now().UnixNano())),
+ }
+ return r
+}
+
+func (r *RaftRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
+ clusterName := r.vgroupMapping[key]
+ if clusterName == "" {
+ return nil, fmt.Errorf("cluster doesnt exist")
+ }
+ r.currentTransactionServiceGroup = key
+ r.currentTransactionClusterName = clusterName
+
+ if !r.metadata.ContainsGroup(clusterName) {
+ if _, ok := r.loadInitAddresses(clusterName); !ok &&
r.cfg.ServerAddr != "" {
+ addrs := strings.Split(r.cfg.ServerAddr, ",")
+ list := make([]*ServiceInstance, 0, len(addrs))
+ for _, addr := range addrs {
+ h, p, err :=
net.SplitHostPort(strings.TrimSpace(addr))
+ if err != nil {
+ log.Infof("invalid init server addr:
%s, err: %v", addr, err)
+ continue
+ }
+ port, err := strconv.Atoi(p)
+ if err != nil {
+ log.Errorf("invalid port: %s", p)
+ continue
+ }
+ list = append(list, &ServiceInstance{Addr: h,
Port: port})
+ }
+ if len(list) == 0 {
+ return nil, fmt.Errorf("invalid service
group/key: %s", key)
+ }
+ r.initAddresses.Store(clusterName, list)
+
+ if err := r.refreshToken(); err != nil {
+ return nil, err
+ }
+
+ err := r.acquireClusterMetaData(clusterName, "")
+ if err != nil {
+ return nil, err
+ }
+ r.startQueryMetadata()
+ }
+ }
+ return r.getServiceInstances(clusterName, "")
+}
+
+func (r *RaftRegistryService) aliveLookup(transactionServiceGroup string)
([]*ServiceInstance, error) {
+ clusterName := r.vgroupMapping[transactionServiceGroup]
+ if clusterName == "" {
+ return nil, fmt.Errorf("cluster doesnt exist")
+ }
+ leader := r.metadata.GetLeader(clusterName)
+ if leader != nil {
+ endpoint, err := r.selectEndpoint(transactionEndpoint, leader)
+ if err != nil {
+ return nil, err
+ }
+ return []*ServiceInstance{endpoint}, nil
+ }
+ return r.getServiceInstances(clusterName, "")
+}
+
+func (r *RaftRegistryService) getServiceInstances(clusterName, group string)
([]*ServiceInstance, error) {
+ nodes := r.metadata.GetNodes(clusterName, group)
+ if len(nodes) > 0 {
+ instances := make([]*ServiceInstance, 0, len(nodes))
+ for _, n := range nodes {
+ inst, _ := r.selectEndpoint(transactionEndpoint, n)
+ if inst != nil {
+ instances = append(instances, inst)
+ }
+ }
+ return instances, nil
+ }
+ return nil, nil
+}
+
+func (r *RaftRegistryService) RefreshAliveLookup(transactionServiceGroup
string, aliveAddress []*ServiceInstance) ([]*ServiceInstance, error) {
+ clusterName := r.vgroupMapping[transactionServiceGroup]
+ if clusterName == "" {
+ return nil, fmt.Errorf("cluster not found for serviceGroup=%s",
transactionServiceGroup)
+ }
+
+ leader := r.metadata.GetLeader(clusterName)
+ if leader == nil {
+ return nil, fmt.Errorf("leader not found for cluster=%s",
clusterName)
+ }
+
+ leaderEndpoint, err := r.selectEndpoint(transactionEndpoint, leader)
+ if err != nil {
+ return nil, err
+ }
+
+ var result []*ServiceInstance
+ for _, addr := range aliveAddress {
+ if addr.Port != leaderEndpoint.Port || addr.Addr !=
leaderEndpoint.Addr {
+ result = append(result, addr)
+ }
+ }
+
+ r.aliveNodes.Store(transactionServiceGroup, result)
+ return result, nil
+}
+
+func (r *RaftRegistryService) Close() {
+ select {
+ case <-r.stopCh:
+ default:
+ close(r.stopCh)
+ }
+}
+
+func (r *RaftRegistryService) selectEndpoint(t string, n *metadata.Node)
(*ServiceInstance, error) {
+ switch t {
+ case controlEndpoint:
+ return &ServiceInstance{
+ Addr: n.Control.Host,
+ Port: n.Control.Port,
+ }, nil
+ case transactionEndpoint:
+ return &ServiceInstance{
+ Addr: n.Transaction.Host,
+ Port: n.Transaction.Port,
+ }, nil
+ default:
+ return nil, fmt.Errorf("SelectEndpoint is not support type:
%s", t)
+ }
+}
+
+func (r *RaftRegistryService) startQueryMetadata() {
+ r.refreshOnce.Do(func() {
+ go func() {
+ metadataMaxAge := int64(30000)
+ if r.cfg.MetadataMaxAgeMs > 0 {
+ metadataMaxAge = r.cfg.MetadataMaxAgeMs
+ }
+ currentTime := time.Now().UnixMilli()
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-r.stopCh:
+ log.Info("raft registry service
stopped")
+ return
+ case <-ticker.C:
+ func() {
+ shouldFetch :=
time.Now().UnixMilli()-currentTime > metadataMaxAge
+ if !shouldFetch {
+ ok, err := r.watch()
+ if err != nil {
+
log.Errorf("watch error: %v", err)
+ shouldFetch =
true
+ } else {
+ shouldFetch = ok
+ }
+ }
+
+ if shouldFetch {
+ clusterName :=
r.currentTransactionClusterName
+ groups :=
r.metadata.Groups(clusterName)
+ if len(groups) == 0 {
+ groups =
append(groups, "")
+ }
+ for _, g := range
groups {
+ err :=
r.acquireClusterMetaData(clusterName, g)
+ if err != nil {
+
log.Errorf("acquire cluster metadata failed: cluster=%s group=%s err=%v",
clusterName, g, err)
+ }
+ }
+
+ currentTime =
time.Now().UnixMilli()
+ }
+ }()
+ }
+ }
+ }()
+ })
+}
+
+func (r *RaftRegistryService) watch() (bool, error) {
+ header := map[string]string{
+ "Content-Type": "application/x-www-form-urlencoded",
+ }
+ clusterNames := r.clusterNamesFromInit()
+ for _, clusterName := range clusterNames {
+ groupTerms := r.metadata.GetClusterTerm(clusterName)
+ if groupTerms == nil {
+ groupTerms = map[string]int64{"": 0}
+ }
+ for group := range groupTerms {
+ tcAddress, err := r.queryHttpAddress(clusterName, group)
+ if err != nil {
+ log.Infof("no tc address to watch for cluster
%s: %v", clusterName, err)
+ continue
+ }
+ if r.isTokenExpired() {
+ if err = r.refreshToken(); err != nil {
+ return false, err
+ }
+ }
+ if r.jwtToken != "" {
+ header["Authorization"] = r.jwtToken
+ }
+
+ form := url.Values{}
+ for k, v := range groupTerms {
+ form.Set(k, strconv.FormatInt(v, 10))
+ }
+
+ endpoint := fmt.Sprintf("http://%s/metadata/v1/watch",
tcAddress)
+ req, err := http.NewRequest("POST", endpoint,
strings.NewReader(form.Encode()))
+ if err != nil {
+ return false, err
+ }
+ for hk, hv := range header {
+ req.Header.Set(hk, hv)
+ }
+ resp, err := r.doRequest(req, 30*time.Second)
+ if err != nil {
+ log.Errorf("watch cluster node: %s, fail: %v",
tcAddress, err)
+ return false, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusUnauthorized {
+ return false, errors.New("authentication
failed: missing username/password")
+ }
+ return resp.StatusCode == http.StatusOK, nil
+ }
+ }
+ return false, nil
+}
+
+func (r *RaftRegistryService) acquireClusterMetaData(clusterName, group
string) error {
+ tcAddress, err := r.queryHttpAddress(clusterName, group)
+ if err != nil {
+ return err
+ }
+ headers := map[string]string{
+ "Content-Type": "application/x-www-form-urlencoded",
+ }
+ if r.isTokenExpired() {
+ if err = r.refreshToken(); err != nil {
+ return err
+ }
+ }
+ if r.jwtToken != "" {
+ headers["Authorization"] = r.jwtToken
+ }
+ u := fmt.Sprintf("http://%s/metadata/v1/cluster", tcAddress)
+ req, err := http.NewRequest("GET", u, nil)
+ if err != nil {
+ return err
+ }
+ q := req.URL.Query()
+ q.Add("group", group)
+ req.URL.RawQuery = q.Encode()
+ for hk, hv := range headers {
+ req.Header.Set(hk, hv)
+ }
+
+ resp, err := r.doRequest(req, 1*time.Second)
+ if err != nil {
+ return fmt.Errorf("http get cluster failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ var mr metadata.MetadataResponse
+ if err = json.Unmarshal(body, &mr); err != nil {
+ return fmt.Errorf("unmarshal metadataResponse failed:
%w", err)
+ }
+ r.metadata.RefreshMetadata(clusterName, mr)
+ return nil
+ } else if resp.StatusCode == http.StatusUnauthorized {
+ if err = r.refreshToken(); err != nil {
+ return err
+ }
+ return fmt.Errorf("authentication failed! you should configure
the correct username and password")
+ }
+ return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+}
+
+/* -------------------- Token management -------------------- */
+
+func (r *RaftRegistryService) isTokenExpired() bool {
+ r.mu.RLock()
+ ts := r.tokenTimestamp
+ r.mu.RUnlock()
+ if ts == -1 {
+ return true
+ }
+ valid := int64(29 * 60 * 1000)
+ if r.cfg.TokenValidityInMilliseconds > 0 {
+ valid = r.cfg.TokenValidityInMilliseconds
+ }
+ expireTime := ts + valid
+ return time.Now().UnixMilli() >= expireTime
+}
+
+func (r *RaftRegistryService) refreshToken() error {
+ address := r.namingserverAddress
+ body, _ := json.Marshal(map[string]string{
+ "username": r.username,
+ "password": r.password,
+ })
+ req, err := http.NewRequest("POST",
fmt.Sprintf("http://%s/api/v1/auth/login", address), bytes.NewReader(body))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := r.doRequest(req, 1*time.Second)
+ if err != nil {
+ return fmt.Errorf("refresh token failed: %w", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return errors.New("authentication failed when refresh token")
+ }
+ respBody, _ := io.ReadAll(resp.Body)
+ var node map[string]interface{}
+ if err = json.Unmarshal(respBody, &node); err != nil {
+ return fmt.Errorf("invalid auth response: %w", err)
+ }
+ code, _ := node["code"].(string)
+ if code != "" && code != "200" {
+ return errors.New("authentication failed! you should configure
the correct username and password")
+ }
+ dataVal := node["data"].(string)
+ r.mu.Lock()
+ r.jwtToken = dataVal
+ r.tokenTimestamp = time.Now().UnixMilli()
+ r.mu.Unlock()
+ return nil
+}
+
+func (r *RaftRegistryService) queryHttpAddress(clusterName, group string)
(string, error) {
+ var addressList []string
+
+ nodes := r.metadata.GetNodes(clusterName, group)
+
+ formatAddr := func(addr string, port int) string {
+ return fmt.Sprintf("%s:%d", addr, port)
+ }
+
+ appendControlAddr := func(node *metadata.Node) error {
+ endpoint, err := r.selectEndpoint(controlEndpoint, node)
+ if err != nil {
+ return fmt.Errorf("failed to select control endpoint:
%v", err)
+ }
+ addressList = append(addressList, formatAddr(endpoint.Addr,
endpoint.Port))
+ return nil
+ }
+
+ if len(nodes) > 0 {
+ currentServiceGroup := r.currentTransactionServiceGroup
+ if aliveAny, ok := r.aliveNodes.Load(currentServiceGroup); ok {
+ aliveNodes, ok := aliveAny.([]*ServiceInstance)
+ if !ok {
+ log.Errorf("invalid type for alive nodes,
expected []*ServiceInstance")
+ return "", fmt.Errorf("invalid alive nodes type
for service group %s", currentServiceGroup)
+ }
+
+ if len(aliveNodes) == 0 {
+ for _, node := range nodes {
+ if err := appendControlAddr(node); err
!= nil {
+ return "", err
+ }
+ }
+ } else {
+ m := make(map[string]*metadata.Node)
+ for _, node := range nodes {
+ inetSocketAddress, _ :=
r.selectEndpoint(transactionEndpoint, node)
+ m[formatAddr(inetSocketAddress.Addr,
inetSocketAddress.Port)] = node
+ }
+
+ for _, alive := range aliveNodes {
+ addrKey := formatAddr(alive.Addr,
alive.Port)
+ node, exists := m[addrKey]
+ if !exists {
+ continue
+ }
+
+ contrEndpoint, err :=
r.selectEndpoint(controlEndpoint, node)
+ if err != nil {
+ log.Errorf("failed to select
control endpoint for node: %v", err)
+ continue
+ }
+ addressList = append(addressList,
formatAddr(contrEndpoint.Addr, contrEndpoint.Port))
+ }
+ }
+ } else {
+ for _, node := range nodes {
+ if err := appendControlAddr(node); err != nil {
+ return "", err
+ }
+ }
+ }
+ } else {
+ if initAny, ok := r.initAddresses.Load(clusterName); ok {
+ for _, inetSocketAddress := range
initAny.([]*ServiceInstance) {
+ addressList = append(addressList,
formatAddr(inetSocketAddress.Addr, inetSocketAddress.Port))
+ }
+ }
+ }
+
+ if len(addressList) == 0 {
+ return "", fmt.Errorf("no available address for cluster=%s
group=%s", clusterName, group)
+ }
+ return addressList[r.random.Intn(len(addressList))], nil
+}
+
+func (r *RaftRegistryService) clusterNamesFromInit() []string {
+ var names []string
+ r.initAddresses.Range(func(key, _ any) bool {
+ if k, ok := key.(string); ok {
+ names = append(names, k)
+ }
+ return true
+ })
+ return names
+}
+
+func (r *RaftRegistryService) doRequest(req *http.Request, timeout
time.Duration) (*http.Response, error) {
+ ctx, cancel := context.WithTimeout(req.Context(), timeout)
+ defer cancel()
+
+ req = req.WithContext(ctx)
+ return r.httpClient.Do(req)
+}
+
+func (r *RaftRegistryService) loadInitAddresses(clusterName string)
([]*ServiceInstance, bool) {
+ if val, ok := r.initAddresses.Load(clusterName); ok {
+ if list, ok := val.([]*ServiceInstance); ok {
+ return list, true
+ }
+ }
+ return nil, false
+}
diff --git a/pkg/discovery/raft_test.go b/pkg/discovery/raft_test.go
new file mode 100644
index 00000000..206e04ad
--- /dev/null
+++ b/pkg/discovery/raft_test.go
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+ "encoding/json"
+ "github.com/stretchr/testify/assert"
+ "math/rand"
+ "net/http"
+ "net/http/httptest"
+ "seata.apache.org/seata-go/pkg/discovery/metadata"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+var (
+ serviceConfig = &ServiceConfig{VgroupMapping:
map[string]string{"default_tx_group": "default"}}
+ raftConfig = RaftConfig{
+ MetadataMaxAgeMs: int64(30000),
+ ServerAddr: "",
+ TokenValidityInMilliseconds: int64(1740000),
+ }
+ registryConfig = &RegistryConfig{
+ Type: "raft",
+ NamingserverAddr: "",
+ Username: "seata",
+ Password: "seata",
+ Raft: raftConfig,
+ }
+)
+
+func TestMetadataHandler(t *testing.T) {
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ t.Logf("Received request: %s %s", r.Method, r.URL.Path)
+
+ switch r.URL.Path {
+ case "/api/v1/auth/login":
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+
+ case "/metadata/v1/cluster":
+ if r.Header.Get("Authorization") != "mock-jwt-token" {
+ t.Errorf("Missing valid token: %s",
r.Header.Get("Authorization"))
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+
+ resp := metadata.MetadataResponse{
+ Term: 1,
+ Nodes: []*metadata.Node{
+ {
+ Control:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Transaction:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Internal:
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+ Group: "default",
+ Role: metadata.LEADER,
+ Version: "2.5.0",
+ Metadata:
map[string]interface{}{},
+ },
+ {
+ Control:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7002},
+ Transaction:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7002},
+ Internal:
&metadata.Endpoint{Host: "127.0.0.1", Port: 9002},
+ Group: "default",
+ Role: metadata.FOLLOWER,
+ Version: "2.5.0",
+ Metadata:
map[string]interface{}{},
+ },
+ {
+ Control:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7003},
+ Transaction:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7003},
+ Internal:
&metadata.Endpoint{Host: "127.0.0.1", Port: 9003},
+ Group: "default",
+ Role: metadata.FOLLOWER,
+ Version: "2.5.0",
+ Metadata:
map[string]interface{}{},
+ },
+ },
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ if err := json.NewEncoder(w).Encode(resp); err != nil {
+ t.Errorf("Failed to encode response: %v", err)
+ }
+
+ default:
+ t.Errorf("Unknown path: %s", r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer mockServer.Close()
+ mockAddr := mockServer.Listener.Addr().String()
+ registryConfig.Raft.ServerAddr = mockAddr
+ registryConfig.NamingserverAddr = mockAddr
+ service := NewRaftRegistryService(serviceConfig, registryConfig)
+ instances, err := service.Lookup("default_tx_group")
+ if err != nil {
+ t.Errorf("Failed to lookup service: %v", err)
+ }
+ serviceInstances := []*ServiceInstance{
+ {
+ Addr: "127.0.0.1",
+ Port: 7001,
+ }, {
+ Addr: "127.0.0.1",
+ Port: 7002,
+ }, {
+ Addr: "127.0.0.1",
+ Port: 7003,
+ },
+ }
+ assert.Equal(t, serviceInstances, instances)
+}
+
+func TestMultiClusterWatch(t *testing.T) {
+ clusterRequests := make(map[string]int)
+ var requestMu sync.Mutex
+
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ requestMu.Lock()
+ clusterRequests[r.URL.Path]++
+ requestMu.Unlock()
+
+ switch r.URL.Path {
+ case "/api/v1/auth/login":
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+
+ case "/metadata/v1/cluster":
+ resp := metadata.MetadataResponse{
+ Term: 1,
+ Nodes: []*metadata.Node{
+ {
+ Control:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Transaction:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Internal:
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+ Group: "",
+ Role: metadata.LEADER,
+ Version: "2.5.0",
+ Metadata:
map[string]interface{}{},
+ },
+ },
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(resp)
+
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer mockServer.Close()
+
+ multiServiceConfig := &ServiceConfig{
+ VgroupMapping: map[string]string{
+ "tx_group_1": "cluster1",
+ "tx_group_2": "cluster2",
+ },
+ }
+
+ mockAddr := mockServer.Listener.Addr().String()
+ multiRegistryConfig := &RegistryConfig{
+ Type: "raft",
+ NamingserverAddr: mockAddr,
+ Username: "seata",
+ Password: "seata",
+ Raft: RaftConfig{
+ MetadataMaxAgeMs: int64(30000),
+ ServerAddr: mockAddr + "," + mockAddr,
+ TokenValidityInMilliseconds: int64(1740000),
+ },
+ }
+
+ service := NewRaftRegistryService(multiServiceConfig,
multiRegistryConfig)
+ defer service.Close()
+
+ _, err1 := service.Lookup("tx_group_1")
+ assert.NoError(t, err1)
+
+ _, err2 := service.Lookup("tx_group_2")
+ assert.NoError(t, err2)
+
+ requestMu.Lock()
+ clusterCount := clusterRequests["/metadata/v1/cluster"]
+ requestMu.Unlock()
+
+ assert.True(t, clusterCount >= 2, "Multiple clusters should trigger
multiple cluster metadata requests")
+}
+
+func TestConcurrentAccess(t *testing.T) {
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/api/v1/auth/login":
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+
+ case "/metadata/v1/cluster":
+ resp := metadata.MetadataResponse{
+ Term: 1,
+ Nodes: []*metadata.Node{
+ {
+ Control:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Transaction:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Internal:
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+ Group: "default",
+ Role: metadata.LEADER,
+ Version: "2.5.0",
+ Metadata:
map[string]interface{}{},
+ },
+ },
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(resp)
+
+ case "/metadata/v1/watch":
+ w.WriteHeader(http.StatusOK)
+
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer mockServer.Close()
+
+ mockAddr := mockServer.Listener.Addr().String()
+ testRegistryConfig := &RegistryConfig{
+ Type: "raft",
+ NamingserverAddr: mockAddr,
+ Username: "seata",
+ Password: "seata",
+ Raft: RaftConfig{
+ MetadataMaxAgeMs: int64(30000),
+ ServerAddr: mockAddr,
+ TokenValidityInMilliseconds: int64(1740000),
+ },
+ }
+
+ service := NewRaftRegistryService(serviceConfig, testRegistryConfig)
+ defer service.Close()
+
+ _, err := service.Lookup("default_tx_group")
+ assert.NoError(t, err)
+
+ var wg sync.WaitGroup
+ successCount := int32(0)
+
+ for i := 0; i < 20; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+
+ _, err := service.Lookup("default_tx_group")
+ if err == nil {
+ atomic.AddInt32(&successCount, 1)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ assert.True(t, atomic.LoadInt32(&successCount) > 0, "Some concurrent
lookups should succeed")
+}
+
+func TestWatchErrorRecovery(t *testing.T) {
+ service := &RaftRegistryService{
+ metadata: metadata.NewMetadata(),
+ vgroupMapping: map[string]string{"test_group":
"test_cluster"},
+ currentTransactionClusterName: "test_cluster",
+ random:
rand.New(rand.NewSource(time.Now().UnixNano())),
+ initAddresses: sync.Map{},
+ cfg:
&RaftConfig{TokenValidityInMilliseconds: 1740000},
+ tokenTimestamp: time.Now().UnixMilli(),
+ httpClient: &http.Client{},
+ }
+
+ service.metadata.RefreshMetadata("test_cluster",
metadata.MetadataResponse{
+ Term: 1,
+ Nodes: []*metadata.Node{
+ {
+ Control: &metadata.Endpoint{Host:
"127.0.0.1", Port: 7001},
+ Transaction: &metadata.Endpoint{Host:
"127.0.0.1", Port: 8001},
+ Internal: &metadata.Endpoint{Host:
"127.0.0.1", Port: 9001},
+ Group: "",
+ Role: metadata.LEADER,
+ Version: "2.5.0",
+ Metadata: map[string]interface{}{},
+ },
+ },
+ })
+
+ service.initAddresses.Store("test_cluster", []*ServiceInstance{
+ {Addr: "127.0.0.1", Port: 7001},
+ })
+
+ result, err := service.watch()
+ assert.Error(t, err)
+ assert.False(t, result)
+
+ address, err := service.queryHttpAddress("test_cluster", "")
+ assert.NoError(t, err)
+ assert.NotEmpty(t, address)
+}
+
+func TestRefreshAliveLookup(t *testing.T) {
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/api/v1/auth/login":
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+
+ case "/metadata/v1/cluster":
+ resp := metadata.MetadataResponse{
+ Term: 1,
+ Nodes: []*metadata.Node{
+ {
+ Control:
&metadata.Endpoint{Host: "127.0.0.1", Port: 7001},
+ Transaction:
&metadata.Endpoint{Host: "127.0.0.1", Port: 8001},
+ Internal:
&metadata.Endpoint{Host: "127.0.0.1", Port: 9001},
+ Group: "default",
+ Role: metadata.LEADER,
+ Version: "2.5.0",
+ Metadata:
map[string]interface{}{},
+ },
+ },
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(resp)
+
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer mockServer.Close()
+
+ mockAddr := mockServer.Listener.Addr().String()
+ testRegistryConfig := &RegistryConfig{
+ Type: "raft",
+ NamingserverAddr: mockAddr,
+ Username: "seata",
+ Password: "seata",
+ Raft: RaftConfig{
+ MetadataMaxAgeMs: int64(30000),
+ ServerAddr: mockAddr,
+ TokenValidityInMilliseconds: int64(1740000),
+ },
+ }
+
+ service := NewRaftRegistryService(serviceConfig, testRegistryConfig)
+ defer service.Close()
+
+ _, err := service.Lookup("default_tx_group")
+ assert.NoError(t, err)
+
+ aliveInstances := []*ServiceInstance{
+ {Addr: "127.0.0.1", Port: 8001},
+ {Addr: "127.0.0.1", Port: 8002},
+ {Addr: "127.0.0.1", Port: 8003},
+ }
+
+ result, err := service.RefreshAliveLookup("default_tx_group",
aliveInstances)
+ assert.NoError(t, err)
+
+ expectedResult := []*ServiceInstance{
+ {Addr: "127.0.0.1", Port: 8002},
+ {Addr: "127.0.0.1", Port: 8003},
+ }
+ assert.Equal(t, expectedResult, result)
+
+ emptyResult, err := service.RefreshAliveLookup("default_tx_group",
[]*ServiceInstance{})
+ assert.NoError(t, err)
+ assert.Empty(t, emptyResult)
+
+ _, err = service.RefreshAliveLookup("non_existent_group",
aliveInstances)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "cluster not found")
+}
+
+func TestTypeAssertionSafety(t *testing.T) {
+ service := &RaftRegistryService{
+ aliveNodes: sync.Map{},
+ vgroupMapping: map[string]string{"test_group": "test_cluster"},
+ metadata: metadata.NewMetadata(),
+ random: rand.New(rand.NewSource(time.Now().UnixNano())),
+ }
+
+ service.metadata.RefreshMetadata("test_cluster",
metadata.MetadataResponse{
+ Term: 1,
+ Nodes: []*metadata.Node{
+ {
+ Control: &metadata.Endpoint{Host:
"127.0.0.1", Port: 7001},
+ Transaction: &metadata.Endpoint{Host:
"127.0.0.1", Port: 8001},
+ Internal: &metadata.Endpoint{Host:
"127.0.0.1", Port: 9001},
+ Group: "",
+ Role: metadata.LEADER,
+ Version: "2.5.0",
+ Metadata: map[string]interface{}{},
+ },
+ },
+ })
+
+ service.aliveNodes.Store("test_group", "invalid_type_string")
+ service.currentTransactionServiceGroup = "test_group"
+
+ _, err := service.queryHttpAddress("test_cluster", "")
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "invalid alive nodes type")
+
+ service.aliveNodes.Store("test_group", []*ServiceInstance{
+ {Addr: "127.0.0.1", Port: 8001},
+ })
+
+ address, err := service.queryHttpAddress("test_cluster", "")
+ assert.NoError(t, err)
+ assert.NotEmpty(t, address)
+}
diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml
index fa8a7f82..6edb83e0 100644
--- a/testdata/conf/seatago.yml
+++ b/testdata/conf/seatago.yml
@@ -120,7 +120,10 @@ seata:
data-id: seata.properties
# Registration Center
registry:
- type: file
+ type: raft
+ namingserver-addr: 127.0.0.1:8081
+ username: seata
+ password: seata
file:
name: seatago.yml
nacos:
@@ -136,6 +139,10 @@ seata:
etcd3:
cluster: "default"
server-addr: "http://localhost:2379"
+ raft:
+ metadata-max-age-ms: 30000
+ server-ddr:
+ token-validity-in-milliseconds: 1740000
log:
exception-rate: 100
tcc:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]