This is an automated email from the ASF dual-hosted git repository.
goodboycoder pushed a commit to branch feature/support-namingserver
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/support-namingserver
by this push:
new 169cfd13 OSPP 2025 Go&Java Version alignment /support-namingserver
(#924)
169cfd13 is described below
commit 169cfd1317ec44cc37c3e66050e3027ba3f81f60
Author: flypiggy <[email protected]>
AuthorDate: Thu Oct 30 16:48:34 2025 +0800
OSPP 2025 Go&Java Version alignment /support-namingserver (#924)
* upd-namingserver
* upd
---
pkg/discovery/base.go | 17 +-
pkg/discovery/config.go | 34 +-
pkg/discovery/init.go | 14 +
pkg/discovery/naming_server.go | 673 ++++++++++++++++++++
.../{base.go => naming_server_registry.go} | 39 +-
pkg/discovery/naming_server_test.go | 683 +++++++++++++++++++++
pkg/{discovery/base.go => util/rand/rand.go} | 33 +-
testdata/conf/seatago.yml | 11 +-
8 files changed, 1456 insertions(+), 48 deletions(-)
diff --git a/pkg/discovery/base.go b/pkg/discovery/base.go
index fcc0d016..8438a808 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/base.go
@@ -18,14 +18,15 @@
package discovery
const (
- FILE string = "file"
- NACOS string = "nacos"
- ETCD string = "etcd"
- EUREKA string = "eureka"
- REDIS string = "redis"
- ZK string = "zk"
- CONSUL string = "consul"
- SOFA string = "sofa"
+ FILE string = "file"
+ NACOS string = "nacos"
+ ETCD string = "etcd"
+ EUREKA string = "eureka"
+ REDIS string = "redis"
+ ZK string = "zk"
+ CONSUL string = "consul"
+ SOFA string = "sofa"
+ NAMINGSERVER string = "namingserver"
)
type ServiceInstance struct {
diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go
index d8a78874..d0375c21 100644
--- a/pkg/discovery/config.go
+++ b/pkg/discovery/config.go
@@ -38,10 +38,11 @@ func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix
string, f *flag.FlagSet
}
type RegistryConfig struct {
- Type string `yaml:"type" json:"type" koanf:"type"`
- File FileConfig `yaml:"file" json:"file" koanf:"file"`
- Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
- Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
+ Type string `yaml:"type" json:"type" koanf:"type"`
+ File FileConfig `yaml:"file" json:"file" koanf:"file"`
+ Nacos NacosConfig `yaml:"nacos" json:"nacos"
koanf:"nacos"`
+ Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3"
koanf:"etcd3"`
+ NamingServer NamingServerConfig `yaml:"naming-server"
json:"namingServer" koanf:"naming-server"`
}
func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f
*flag.FlagSet) {
@@ -49,6 +50,7 @@ func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix
string, f *flag.FlagSe
cfg.File.RegisterFlagsWithPrefix(prefix+".file", f)
cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f)
cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f)
+ cfg.NamingServer.RegisterFlagsWithPrefix(prefix+".naming-server", f)
}
type FileConfig struct {
@@ -90,3 +92,27 @@ func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix
string, f *flag.FlagSet)
f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server
address of registry.")
f.StringVar(&cfg.ServerAddr, prefix+".server-addr",
"http://localhost:2379", "The server address of registry.")
}
+
+type NamingServerConfig struct {
+ Cluster string `yaml:"cluster" json:"cluster"
koanf:"cluster"`
+ ServerAddr string `yaml:"server-addr"
json:"server-addr" koanf:"server-addr"`
+ Namespace string `yaml:"namespace" json:"namespace"
koanf:"namespace"`
+ HeartbeatPeriod int `yaml:"heartbeat-period"
json:"heartbeat-period" koanf:"heartbeat-period"`
+ MetadataMaxAgeMs int64 `yaml:"metadata-max-age-ms"
json:"metadata-max-age-ms" koanf:"metadata-max-age-ms"`
+ Username string `yaml:"username" json:"username"
koanf:"username"`
+ Password string `yaml:"password" json:"password"
koanf:"password"`
+ TokenValidityInMilliseconds int64
`yaml:"token-validity-in-milliseconds" json:"token-validity-in-milliseconds"
koanf:"token-validity-in-milliseconds"`
+ SecretKey string `yaml:"secret-key" json:"secret-key"
koanf:"secret-key"`
+}
+
+func (cfg *NamingServerConfig) RegisterFlagsWithPrefix(prefix string, f
*flag.FlagSet) {
+ f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The cluster
name of naming server")
+ f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "127.0.0.1:8081",
"The server address of naming server (host:port)")
+ f.StringVar(&cfg.Namespace, prefix+".namespace", "public", "The
namespace of naming server")
+ f.IntVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 5000, "The
heartbeat period in milliseconds")
+ f.Int64Var(&cfg.MetadataMaxAgeMs, prefix+".metadata-max-age-ms", 30000,
"The max age of metadata in milliseconds")
+ f.StringVar(&cfg.Username, prefix+".username", "", "The username for
authentication")
+ f.StringVar(&cfg.Password, prefix+".password", "", "The password for
authentication")
+ f.Int64Var(&cfg.TokenValidityInMilliseconds,
prefix+".token-validity-in-milliseconds", 29*60*1000, "The validity period of
token in milliseconds")
+ f.StringVar(&cfg.SecretKey, prefix+".secret-key", "", "The secret key
for JWT authentication (matches Naming Server's secretKey)")
+}
diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go
index 8e7f5e38..b4ff329e 100644
--- a/pkg/discovery/init.go
+++ b/pkg/discovery/init.go
@@ -47,6 +47,9 @@ func InitRegistry(serviceConfig *ServiceConfig,
registryConfig *RegistryConfig)
//TODO: init consul registry
case SOFA:
//TODO: init sofa registry
+ case NAMINGSERVER:
+ // init namingserver registry
+ registryService = newNamingServerRegistryService(serviceConfig,
®istryConfig.NamingServer)
default:
err = fmt.Errorf("service registry not support registry
type:%s", registryConfig.Type)
}
@@ -60,3 +63,14 @@ func InitRegistry(serviceConfig *ServiceConfig,
registryConfig *RegistryConfig)
func GetRegistry() RegistryService {
return registryServiceInstance
}
+
+func GetNamingserverRegistry() (NamingserverRegistry, error) {
+ if registryServiceInstance == nil {
+ return nil, fmt.Errorf("registry service not initialized")
+ }
+ namingReg, ok := registryServiceInstance.(NamingserverRegistry)
+ if !ok {
+ return nil, fmt.Errorf("current registry is not namingserver")
+ }
+ return namingReg, nil
+}
diff --git a/pkg/discovery/naming_server.go b/pkg/discovery/naming_server.go
new file mode 100644
index 00000000..44117c8a
--- /dev/null
+++ b/pkg/discovery/naming_server.go
@@ -0,0 +1,673 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ gostnet "github.com/dubbogo/gost/net"
+ "go.uber.org/zap"
+ "net/http"
+ "net/url"
+ "seata.apache.org/seata-go/pkg/util/rand"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ httpPrefix = "http://"
+ healthCheckThreshold = 3
+ longPollTimeoutPeriod = 28 * time.Second
+ authorizationHeader = "Authorization"
+ contentTypeJSON = "application/json"
+ namingAddrCacheTTL = 30 * time.Second
+ failureRecoveryThreshold = 3
+ maxRetryAttempts = 3
+ retryDelayMs = 1000
+)
+
+type MetaResponse struct {
+ Term int64 `json:"term"`
+ ClusterList []Cluster `json:"clusterList"`
+}
+
+type Cluster struct {
+ ClusterName string `json:"clusterName"`
+ ClusterType string `json:"clusterType"`
+ UnitData []Unit `json:"unitData"`
+}
+
+type Unit struct {
+ UnitName string `json:"unitName"`
+ NamingInstanceList []NamingServerNode `json:"namingInstanceList"`
+}
+
+type NamingServerNode struct {
+ Role ClusterRole `json:"role"`
+ Term int64 `json:"term"`
+ Transaction Endpoint `json:"transaction"`
+ Control Endpoint `json:"control"`
+ Internal Endpoint `json:"internal"`
+ Group string `json:"group"`
+ Version string `json:"version"`
+ Metadata map[string]interface{} `json:"metadata"`
+ TimeStamp int64 `json:"timeStamp"`
+ Weight float64 `json:"weight"`
+ Healthy bool `json:"healthy"`
+ Unit string `json:"unit"`
+}
+
+type Endpoint struct {
+ Host string `json:"host"`
+ Port int `json:"port"`
+ Protocol string `json:"protocol"`
+}
+
+type ExternalEndpoint struct {
+ Host string `json:"host"`
+ ControlPort int `json:"controlPort"`
+ TransactionPort int `json:"transactionPort"`
+}
+
+type ClusterRole string
+
+const (
+ ClusterRoleLeader ClusterRole = "LEADER"
+ ClusterRoleMember ClusterRole = "MEMBER"
+)
+
+type NamingServerClient struct {
+ config *NamingServerConfig
+ logger *zap.Logger
+ mu sync.Mutex
+ instance *NamingServerClient
+ term int64
+ jwtToken string
+ tokenTimeStamp int64
+ isSubscribed bool
+ namingAddrCache string
+ namingAddrCacheTimestamp int64
+
+ availableNamingMap sync.Map
+ vgroupAddressMap sync.Map
+ listenerServiceMap sync.Map
+
+ healthCheckTicker *time.Ticker
+ closeChan chan struct{}
+ wg sync.WaitGroup
+
+ httpClient *http.Client
+ longPollClient *http.Client
+}
+
+type NamingListener interface {
+ OnEvent(vGroup string) error
+}
+
+type NamingServerRegistryService struct {
+ client *NamingServerClient
+}
+
+var _ NamingserverRegistry = (*NamingServerRegistryService)(nil)
+
+func (n *NamingServerRegistryService) Lookup(key string) ([]*ServiceInstance,
error) {
+ return n.client.Lookup(key)
+}
+
+func (n *NamingServerRegistryService) Close() {
+ n.client.Close()
+}
+
+func newNamingServerRegistryService(_ *ServiceConfig, cfg *NamingServerConfig)
RegistryService {
+ client := GetInstance(cfg)
+ return &NamingServerRegistryService{
+ client: client,
+ }
+}
+
+var (
+ namingServerInstance *NamingServerClient
+ namingServerOnce sync.Once
+)
+
+func GetInstance(config *NamingServerConfig) *NamingServerClient {
+ namingServerOnce.Do(func() {
+ namingServerInstance = &NamingServerClient{
+ config: config,
+ logger:
zap.L().Named("naming-server-client"),
+ closeChan: make(chan struct{}),
+ healthCheckTicker:
time.NewTicker(time.Duration(config.HeartbeatPeriod) * time.Millisecond),
+ httpClient: &http.Client{Timeout: 3 *
time.Second},
+ longPollClient: &http.Client{Timeout: 30 *
time.Second},
+ }
+ // Initialize available naming server addresses from config
+ namingServerInstance.initNamingAddrs()
+ namingServerInstance.initHealthCheck()
+ })
+ return namingServerInstance
+}
+
+func resetInstance() {
+ if namingServerInstance != nil {
+ namingServerInstance.Close()
+ namingServerInstance.mu.Lock()
+ namingServerInstance.clearNamingAddrCache()
+ namingServerInstance.mu.Unlock()
+ }
+ namingServerInstance = nil
+ namingServerOnce = sync.Once{}
+}
+
+func (c *NamingServerClient) initNamingAddrs() {
+ addrs := c.getNamingAddrs()
+ for _, addr := range addrs {
+ c.availableNamingMap.Store(addr, int32(0))
+ }
+}
+
+func (c *NamingServerClient) initHealthCheck() {
+ c.wg.Add(1)
+ go func() {
+ defer c.wg.Done()
+ for {
+ select {
+ case <-c.healthCheckTicker.C:
+ urlList := c.getNamingAddrs()
+ c.checkAvailableNamingAddr(urlList)
+ case <-c.closeChan:
+ return
+ }
+ }
+ }()
+}
+
+func (c *NamingServerClient) checkAvailableNamingAddr(urlList []string) {
+ for _, addr := range urlList {
+ isHealthy := c.doHealthCheck(addr)
+
+ val, _ := c.availableNamingMap.LoadOrStore(addr, int32(0))
+ failCount := val.(int32)
+
+ if !isHealthy {
+ newFailCount := atomic.AddInt32(&failCount, 1)
+ c.availableNamingMap.Store(addr, newFailCount)
+ if newFailCount == 1 {
+ c.logger.Warn("naming server check failed",
zap.String("addr", addr), zap.Int32("failCount", newFailCount))
+ } else if newFailCount >= healthCheckThreshold {
+ c.logger.Error("naming server offline",
zap.String("addr", addr), zap.Int32("failCount", newFailCount))
+ c.mu.Lock()
+ if c.namingAddrCache == addr {
+ c.clearNamingAddrCache()
+ }
+ c.mu.Unlock()
+ }
+ } else {
+ if failCount > 0 {
+ c.availableNamingMap.Store(addr, int32(0))
+ c.logger.Info("naming server recovered",
zap.String("addr", addr), zap.Int32("previousFailCount", failCount))
+ }
+ }
+ }
+}
+
+func (c *NamingServerClient) doHealthCheck(addr string) bool {
+ checkURL := fmt.Sprintf("%s%s/naming/v1/health", httpPrefix, addr)
+
+ req, err := http.NewRequest(http.MethodGet, checkURL, nil)
+ if err != nil {
+ c.logger.Error("create health check request failed",
zap.Error(err))
+ return false
+ }
+ req.Header.Set("Content-Type", contentTypeJSON)
+
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ c.logger.Error("health check failed", zap.String("addr", addr),
zap.Error(err))
+ return false
+ }
+ defer resp.Body.Close()
+ return resp.StatusCode == http.StatusOK
+}
+
+func (c *NamingServerClient) getNamingAddrs() []string {
+ return strings.Split(c.config.ServerAddr, ",")
+}
+
+func (c *NamingServerClient) Lookup(vGroup string) ([]*ServiceInstance, error)
{
+ if !c.isSubscribed {
+ if err := c.RefreshGroup(vGroup); err != nil {
+ return nil, fmt.Errorf("refresh group failed: %w", err)
+ }
+ listener := &RefreshListener{client: c}
+ if err := c.Subscribe(vGroup, listener); err != nil {
+ return nil, fmt.Errorf("subscribe failed: %w", err)
+ }
+ }
+
+ val, ok := c.vgroupAddressMap.Load(vGroup)
+ if !ok {
+ if err := c.RefreshGroup(vGroup); err != nil {
+ return nil, fmt.Errorf("refresh group failed: %w", err)
+ }
+ val, ok = c.vgroupAddressMap.Load(vGroup)
+ if !ok {
+ return nil, errors.New("no nodes found for vgroup")
+ }
+ }
+
+ nodes := val.([]NamingServerNode)
+
+ var instances []*ServiceInstance
+ for _, node := range nodes {
+ if !node.Healthy {
+ continue
+ }
+
+ if node.Transaction.Host == "" || node.Transaction.Port <= 0 ||
node.Transaction.Port > 65535 {
+ c.logger.Warn("invalid node address",
zap.String("host", node.Transaction.Host), zap.Int("port",
node.Transaction.Port))
+ continue
+ }
+
+ instances = append(instances, &ServiceInstance{
+ Addr: node.Transaction.Host,
+ Port: node.Transaction.Port,
+ })
+ }
+ return instances, nil
+}
+
+func (c *NamingServerClient) RefreshGroup(vGroup string) error {
+ namingAddr, err := c.getNamingAddr()
+ if err != nil {
+ return err
+ }
+
+ if c.isTokenExpired() {
+ if err := c.RefreshToken(namingAddr); err != nil {
+ return err
+ }
+ }
+
+ params := url.Values{}
+ params.Add("vGroup", vGroup)
+ params.Add("namespace", c.config.Namespace)
+
+ discoveryURL := fmt.Sprintf("%s%s/naming/v1/discovery?%s", httpPrefix,
namingAddr, params.Encode())
+ req, err := http.NewRequest(http.MethodGet, discoveryURL, nil)
+ if err != nil {
+ return err
+ }
+ if c.jwtToken != "" {
+ req.Header.Set(authorizationHeader, c.jwtToken)
+ }
+ req.Header.Set("Content-Type", contentTypeJSON)
+
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("discovery request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ if resp.StatusCode == http.StatusUnauthorized {
+ return fmt.Errorf("discovery failed: unauthorized
(401), authentication required. please configure username and password in the
naming server config")
+ }
+ return fmt.Errorf("discovery failed, status: %d",
resp.StatusCode)
+ }
+
+ var metaResp MetaResponse
+ if err := json.NewDecoder(resp.Body).Decode(&metaResp); err != nil {
+ return fmt.Errorf("decode meta response failed: %w", err)
+ }
+
+ return c.handleMetadata(&metaResp, vGroup)
+}
+
+func (c *NamingServerClient) getNamingAddr() (string, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ now := time.Now().UnixMilli()
+
+ if c.namingAddrCache != "" &&
+ now-c.namingAddrCacheTimestamp <
namingAddrCacheTTL.Milliseconds() {
+
+ if val, ok := c.availableNamingMap.Load(c.namingAddrCache); ok {
+ failCount := val.(int32)
+ if failCount < healthCheckThreshold {
+ return c.namingAddrCache, nil
+ }
+ }
+
+ c.clearNamingAddrCache()
+ }
+
+ var availableAddrs []string
+ c.availableNamingMap.Range(func(key, value interface{}) bool {
+ addr := key.(string)
+ failCount := value.(int32)
+ if failCount < healthCheckThreshold {
+ availableAddrs = append(availableAddrs, addr)
+ }
+ return true
+ })
+
+ if len(availableAddrs) == 0 {
+ return "", errors.New("no available naming server")
+ }
+
+ addr := availableAddrs[rand.RandIntn(len(availableAddrs))]
+ c.namingAddrCache = addr
+ c.namingAddrCacheTimestamp = now
+ return addr, nil
+}
+
+func (c *NamingServerClient) clearNamingAddrCache() {
+ c.namingAddrCache = ""
+ c.namingAddrCacheTimestamp = 0
+}
+
+func (c *NamingServerClient) isTokenExpired() bool {
+ if c.config.Username == "" || c.config.Password == "" {
+ return false
+ }
+
+ ts := atomic.LoadInt64(&c.tokenTimeStamp)
+ if ts == 0 {
+ return true
+ }
+
+ return time.Now().UnixMilli() >= ts+c.config.TokenValidityInMilliseconds
+}
+
+func (c *NamingServerClient) RefreshToken(addr string) error {
+ if c.config.Username == "" || c.config.Password == "" {
+ return nil
+ }
+
+ var lastErr error
+ for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
+ err := c.doRefreshToken(addr)
+ if err == nil {
+ return nil
+ }
+
+ lastErr = err
+ if attempt < maxRetryAttempts {
+ c.logger.Warn("token refresh failed, retrying",
+ zap.String("addr", addr),
+ zap.Int("attempt", attempt),
+ zap.Error(err))
+ time.Sleep(time.Duration(retryDelayMs*attempt) *
time.Millisecond)
+ }
+ }
+
+ c.logger.Error("token refresh failed after all retries",
+ zap.String("addr", addr),
+ zap.Int("maxAttempts", maxRetryAttempts),
+ zap.Error(lastErr))
+ return fmt.Errorf("token refresh failed after %d attempts: %w",
maxRetryAttempts, lastErr)
+}
+
+func (c *NamingServerClient) doRefreshToken(addr string) error {
+ loginURL := fmt.Sprintf("%s%s/api/v1/auth/login", httpPrefix, addr)
+
+ loginReq := struct {
+ Username string `json:"username"`
+ Password string `json:"password"`
+ }{
+ Username: c.config.Username,
+ Password: c.config.Password,
+ }
+
+ body, err := json.Marshal(loginReq)
+ if err != nil {
+ return fmt.Errorf("marshal login request failed: %w", err)
+ }
+
+ req, err := http.NewRequest(http.MethodPost, loginURL,
strings.NewReader(string(body)))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", contentTypeJSON)
+
+ resp, err := c.httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("login request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ c.logger.Error("login request failed",
+ zap.String("addr", addr),
+ zap.Int("statusCode", resp.StatusCode),
+ zap.String("username", c.config.Username))
+ return fmt.Errorf("authentication failed: status %d",
resp.StatusCode)
+ }
+
+ var loginResp struct {
+ Success bool `json:"success"`
+ Data string `json:"data"`
+ Code interface{} `json:"code"`
+ Msg string `json:"msg"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&loginResp); err != nil {
+ return fmt.Errorf("decode login response failed: %w", err)
+ }
+
+ c.logger.Debug("login response received",
+ zap.Bool("success", loginResp.Success),
+ zap.String("code", fmt.Sprint(loginResp.Code)),
+ zap.String("msg", loginResp.Msg),
+ zap.String("data", loginResp.Data))
+
+ if !loginResp.Success {
+ errMsg := fmt.Sprintf("authentication failed: success=false,
msg=%s", loginResp.Msg)
+ c.logger.Error(errMsg)
+ return fmt.Errorf(errMsg)
+ }
+ if loginResp.Data == "" {
+ return fmt.Errorf("authentication failed: token is empty")
+ }
+
+ c.jwtToken = loginResp.Data
+ atomic.StoreInt64(&c.tokenTimeStamp, time.Now().UnixMilli())
+ c.logger.Info("token refreshed successfully", zap.String("addr", addr))
+ return nil
+}
+
+func (c *NamingServerClient) handleMetadata(metaResp *MetaResponse, vGroup
string) error {
+ if metaResp.Term > 0 {
+ atomic.StoreInt64(&c.term, metaResp.Term)
+ }
+
+ var newNodes []NamingServerNode
+ for _, cluster := range metaResp.ClusterList {
+ for _, unit := range cluster.UnitData {
+ for _, node := range unit.NamingInstanceList {
+ if (node.Role == ClusterRoleLeader && node.Term
>= atomic.LoadInt64(&c.term)) ||
+ node.Role == ClusterRoleMember {
+ newNodes = append(newNodes, node)
+ }
+ }
+ }
+ }
+
+ c.vgroupAddressMap.Store(vGroup, newNodes)
+ return nil
+}
+
+type RefreshListener struct {
+ client *NamingServerClient
+}
+
+func (l *RefreshListener) OnEvent(vGroup string) error {
+ return l.client.RefreshGroup(vGroup)
+}
+
+func (c *NamingServerClient) Subscribe(vGroup string, listener NamingListener)
error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ val, _ := c.listenerServiceMap.LoadOrStore(vGroup, []NamingListener{})
+ listeners := append(val.([]NamingListener), listener)
+ c.listenerServiceMap.Store(vGroup, listeners)
+
+ if !c.isSubscribed {
+ c.isSubscribed = true
+ c.wg.Add(1)
+ go c.watchLoop(vGroup)
+ }
+ return nil
+}
+
+func (c *NamingServerClient) watchLoop(vGroup string) {
+ defer c.wg.Done()
+ var retryCount int
+
+ for {
+ select {
+ case <-c.closeChan:
+ return
+ default:
+ }
+
+ changed, err := c.Watch(vGroup)
+ if err != nil {
+ retryCount++
+ if retryCount > failureRecoveryThreshold {
+ c.logger.Error("watch failed continuously, will
retry",
+ zap.Error(err), zap.Int("retryCount",
retryCount))
+ select {
+ case <-time.After(time.Duration(retryDelayMs) *
time.Millisecond):
+ case <-c.closeChan:
+ return
+ }
+ } else {
+ c.logger.Warn("watch error, will retry
immediately", zap.Error(err))
+ }
+ continue
+ }
+
+ retryCount = 0
+
+ if changed {
+ if err := c.RefreshGroup(vGroup); err != nil {
+ c.logger.Error("refresh group failed in watch",
zap.Error(err))
+ continue
+ }
+ val, ok := c.listenerServiceMap.Load(vGroup)
+ if !ok {
+ continue
+ }
+ for _, listener := range val.([]NamingListener) {
+ if err := listener.OnEvent(vGroup); err != nil {
+ c.logger.Warn("listener callback
failed", zap.Error(err))
+ }
+ }
+ }
+ }
+}
+
+func (c *NamingServerClient) Watch(vGroup string) (bool, error) {
+ namingAddr, err := c.getNamingAddr()
+ if err != nil {
+ return false, err
+ }
+
+ if c.isTokenExpired() {
+ if err := c.RefreshToken(namingAddr); err != nil {
+ return false, err
+ }
+ }
+
+ clientIP, err := gostnet.GetLocalIP()
+ if err != nil {
+ return false, fmt.Errorf("failed to get local IP: %w", err)
+ }
+
+ params := url.Values{}
+ params.Add("vGroup", vGroup)
+ params.Add("clientTerm", strconv.FormatInt(atomic.LoadInt64(&c.term),
10))
+ params.Add("timeout",
strconv.FormatInt(longPollTimeoutPeriod.Milliseconds(), 10))
+ params.Add("clientAddr", clientIP)
+
+ watchURL := fmt.Sprintf("%s%s/naming/v1/watch?%s", httpPrefix,
namingAddr, params.Encode())
+ req, err := http.NewRequest(http.MethodPost, watchURL, nil)
+ if err != nil {
+ return false, err
+ }
+ if c.jwtToken != "" {
+ req.Header.Set(authorizationHeader, c.jwtToken)
+ }
+ req.Header.Set("Content-Type", contentTypeJSON)
+
+ resp, err := c.longPollClient.Do(req)
+ if err != nil {
+ return false, fmt.Errorf("watch request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ switch resp.StatusCode {
+ case http.StatusOK:
+ return true, nil
+ case http.StatusNotModified:
+ return false, nil
+ case http.StatusUnauthorized:
+ return false, fmt.Errorf("watch failed: unauthorized (401),
token may have expired or authentication is required")
+ default:
+ return false, fmt.Errorf("watch request failed with status
code: %d", resp.StatusCode)
+ }
+}
+
+func (c *NamingServerClient) Close() {
+ close(c.closeChan)
+ c.healthCheckTicker.Stop()
+ c.wg.Wait()
+ c.logger.Info("naming server client closed")
+}
+
+func (n *NamingServerRegistryService) Register(instance *ServiceInstance)
error {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (n *NamingServerRegistryService) Deregister(instance *ServiceInstance)
error {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (n *NamingServerRegistryService) doHealthCheck(addr string) bool {
+ return n.client.doHealthCheck(addr)
+}
+
+func (n *NamingServerRegistryService) RefreshToken(addr string) error {
+ return n.client.RefreshToken(addr)
+}
+
+func (n *NamingServerRegistryService) RefreshGroup(vGroup string) error {
+ return n.client.RefreshGroup(vGroup)
+}
+
+func (n *NamingServerRegistryService) Watch(vGroup string) (bool, error) {
+ return n.client.Watch(vGroup)
+}
diff --git a/pkg/discovery/base.go b/pkg/discovery/naming_server_registry.go
similarity index 56%
copy from pkg/discovery/base.go
copy to pkg/discovery/naming_server_registry.go
index fcc0d016..f5289fd8 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/naming_server_registry.go
@@ -17,23 +17,26 @@
package discovery
-const (
- FILE string = "file"
- NACOS string = "nacos"
- ETCD string = "etcd"
- EUREKA string = "eureka"
- REDIS string = "redis"
- ZK string = "zk"
- CONSUL string = "consul"
- SOFA string = "sofa"
-)
-
-type ServiceInstance struct {
- Addr string
- Port int
-}
+type NamingserverRegistry interface {
+ RegistryService
+
+ Register(instance *ServiceInstance) error
+
+ Deregister(instance *ServiceInstance) error
+
+ // doHealthCheck
+ // perform a health check and call the/amine/v1/health interface
+ doHealthCheck(addr string) bool
+
+ // RefreshToken
+ // Refresh the JWT token and call the /api/v1/auth/login interface.
+ RefreshToken(addr string) error
+
+ // RefreshGroup
+ // Refresh the service group information and call the
/naming/v1/discovery interface.
+ RefreshGroup(vGroup string) error
-type RegistryService interface {
- Lookup(key string) ([]*ServiceInstance, error)
- Close()
+ // Watch
+ // Monitor service changes and call the /naming/v1/watch interface.
+ Watch(vGroup string) (bool, error)
}
diff --git a/pkg/discovery/naming_server_test.go
b/pkg/discovery/naming_server_test.go
new file mode 100644
index 00000000..f342c631
--- /dev/null
+++ b/pkg/discovery/naming_server_test.go
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+ "encoding/json"
+ "flag"
+ "go.uber.org/zap"
+ "gopkg.in/yaml.v2"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+// fake config for tests
+var testConfig = &NamingServerConfig{
+ ServerAddr: "127.0.0.1:8080,localhost:9090",
+ HeartbeatPeriod: 1000,
+ Namespace: "testns",
+ Username: "user",
+ Password: "pass",
+ TokenValidityInMilliseconds: 100,
+ MetadataMaxAgeMs: 1000,
+}
+
+// Test getNamingAddrs splits config.ServerAddr
+func TestGetNamingAddrs(t *testing.T) {
+ client := &NamingServerClient{
+ config: testConfig,
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ addrs := client.getNamingAddrs()
+ if len(addrs) != 2 {
+ t.Errorf("expected 2 addresses, got %d", len(addrs))
+ }
+}
+
+// Test isTokenExpired behavior
+func TestIsTokenExpired_NoCredentials(t *testing.T) {
+ cfg := *testConfig
+ cfg.Username = ""
+ cfg.Password = ""
+ client := &NamingServerClient{
+ config: &cfg,
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ // without credentials, token should never be considered expired
+ atomic.StoreInt64(&client.tokenTimeStamp, 0)
+ if client.isTokenExpired() {
+ t.Error("expected token not expired when no credentials")
+ }
+}
+
+func TestIsTokenExpired_WithCredentials(t *testing.T) {
+ client := &NamingServerClient{
+ config: testConfig,
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ // timestamp zero => expired
+ atomic.StoreInt64(&client.tokenTimeStamp, 0)
+ if !client.isTokenExpired() {
+ t.Error("expected token expired when timestamp zero and
credentials present")
+ }
+ // future timestamp => not expired
+ atomic.StoreInt64(&client.tokenTimeStamp, time.Now().UnixMilli())
+ if client.isTokenExpired() {
+ t.Error("expected token not expired when timestamp fresh")
+ }
+}
+
+// Test doHealthCheck with a local httptest server
+func TestDoHealthCheck(t *testing.T) {
+ // healthy server
+ healthy := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer healthy.Close()
+ addr := healthy.Listener.Addr().String()
+ client := &NamingServerClient{
+ logger: zap.NewNop(),
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ if !client.doHealthCheck(addr) {
+ t.Error("expected healthy server to return true")
+ }
+
+ // unhealthy server
+ unhealthy := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusInternalServerError)
+ }))
+ defer unhealthy.Close()
+ addr2 := unhealthy.Listener.Addr().String()
+ if client.doHealthCheck(addr2) {
+ t.Error("expected unhealthy server to return false")
+ }
+}
+
+func TestHealthCheckThreshold(t *testing.T) {
+ resetInstance()
+ client := &NamingServerClient{
+ config: testConfig,
+ logger: zap.NewNop(),
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+
+ addr := "test-addr"
+
+ client.availableNamingMap.Store(addr, int32(0))
+ val, _ := client.availableNamingMap.Load(addr)
+ if val.(int32) != 0 {
+ t.Error("initial fail count should be 0")
+ }
+
+ client.checkAvailableNamingAddr([]string{addr})
+ val, _ = client.availableNamingMap.Load(addr)
+ if val.(int32) != 1 {
+ t.Error("fail count should be 1 after first failure")
+ }
+
+ client.checkAvailableNamingAddr([]string{addr})
+ val, _ = client.availableNamingMap.Load(addr)
+ if val.(int32) != 2 {
+ t.Error("fail count should be 2 after second failure")
+ }
+
+ client.checkAvailableNamingAddr([]string{addr})
+ val, _ = client.availableNamingMap.Load(addr)
+ if val.(int32) != 3 {
+ t.Error("fail count should be 3 after third failure")
+ }
+}
+
+// Test handleMetadata filters nodes correctly
+func TestHandleMetadata(t *testing.T) {
+ client := &NamingServerClient{
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ meta := &MetaResponse{
+ Term: 2,
+ ClusterList: []Cluster{
+ {
+ ClusterName: "c1",
+ ClusterType: "t1",
+ UnitData: []Unit{
+ {
+ UnitName: "u1",
+ NamingInstanceList:
[]NamingServerNode{
+ {Role:
ClusterRoleLeader, Term: 2, Healthy: true},
+ {Role:
ClusterRoleLeader, Term: 1, Healthy: true},
+ {Role:
ClusterRoleMember, Term: 1, Healthy: false},
+ },
+ },
+ },
+ },
+ },
+ }
+ err := client.handleMetadata(meta, "vg1")
+ if err != nil {
+ t.Fatalf("handleMetadata returned error: %v", err)
+ }
+ val, ok := client.vgroupAddressMap.Load("vg1")
+ if !ok {
+ t.Fatal("vgroupAddressMap missing key vg1")
+ }
+ nodes := val.([]NamingServerNode)
+ // expect only leader with term>=2 and all members
+ if len(nodes) != 2 {
+ t.Errorf("expected 2 nodes, got %d", len(nodes))
+ }
+}
+
+// Additional tests for getNamingAddr when availableNamingMap populated
+func TestGetNamingAddr_NoAvailable(t *testing.T) {
+ client := &NamingServerClient{
+ config: testConfig,
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ // empty map
+ _, err := client.getNamingAddr()
+ if err == nil {
+ t.Error("expected error when no available naming server")
+ }
+}
+
+func TestGetNamingAddr_OneAvailable(t *testing.T) {
+ client := &NamingServerClient{
+ config: testConfig,
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ client.availableNamingMap.Store("host1", int32(0))
+ addr, err := client.getNamingAddr()
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if addr != "host1" {
+ t.Errorf("expected host1, got %s", addr)
+ }
+}
+
+func TestGetNamingAddr_CacheExpiry(t *testing.T) {
+ resetInstance()
+ client := &NamingServerClient{
+ config: testConfig,
+ httpClient: &http.Client{Timeout: 3 * time.Second},
+ longPollClient: &http.Client{Timeout: 30 * time.Second},
+ }
+ client.availableNamingMap.Store("host1", int32(0))
+ client.availableNamingMap.Store("host2", int32(0))
+
+ _, err := client.getNamingAddr()
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ client.namingAddrCacheTimestamp = time.Now().UnixMilli() - 31000
+
+ _, err = client.getNamingAddr()
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if client.namingAddrCacheTimestamp <= time.Now().UnixMilli()-30000 {
+ t.Error("cache timestamp should be updated after expiry")
+ }
+}
+
+func TestLookup_FirstCall(t *testing.T) {
+ // Reset naming server singleton instance
+ resetInstance()
+ // Create mock naming server with necessary endpoints (login,
discovery, health)
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ t.Logf("Received request: %s %s", r.Method, r.URL.Path)
+
+ switch r.URL.Path {
+ case "/api/v1/auth/login":
+ // Handle login request, return valid JWT token
+ w.Header().Set("Content-Type", contentTypeJSON)
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+
+ case "/naming/v1/discovery":
+ // Verify auth token
+ if r.Header.Get(authorizationHeader) !=
"mock-jwt-token" {
+ t.Errorf("Missing valid token: %s",
r.Header.Get(authorizationHeader))
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+
+ // Construct mock response with 4 nodes (2 valid, 2
invalid)
+ resp := MetaResponse{
+ Term: 1,
+ ClusterList: []Cluster{{
+ ClusterName: "testCluster",
+ ClusterType: "SEATA",
+ UnitData: []Unit{{
+ UnitName: "testUnit",
+ NamingInstanceList:
[]NamingServerNode{
+ {Role:
ClusterRoleLeader, Term: 1, Healthy: true, Transaction: Endpoint{Host:
"10.0.0.1", Port: 8091, Protocol: "tcp"}, Version: "1.0.0"},
+ {Role:
ClusterRoleMember, Term: 1, Healthy: true, Transaction: Endpoint{Host:
"10.0.0.2", Port: 8092, Protocol: "tcp"}, Version: "1.0.0"},
+ {Healthy: false,
Transaction: Endpoint{Host: "10.0.0.3", Port: 8093}},
+ {Healthy: true,
Transaction: Endpoint{Host: "10.0.0.4", Port: 70000}},
+ },
+ }},
+ }},
+ }
+
+ w.Header().Set("Content-Type", contentTypeJSON)
+ w.WriteHeader(http.StatusOK)
+ if err := json.NewEncoder(w).Encode(resp); err != nil {
+ t.Errorf("Failed to encode response: %v", err)
+ }
+
+ case "/naming/v1/health":
+ w.WriteHeader(http.StatusOK)
+
+ default:
+ t.Errorf("Unknown path: %s", r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer mockServer.Close()
+
+ // Configure client to use mock server
+ mockAddr := mockServer.Listener.Addr().String()
+ testConfig.ServerAddr = mockAddr
+ client := GetInstance(testConfig)
+ client.logger = zap.NewNop()
+ client.availableNamingMap.Store(mockAddr, int32(0))
+ client.namingAddrCache = mockAddr
+
+ // Call Lookup and check for errors
+ instances, err := client.Lookup("testGroup")
+ if err != nil {
+ t.Fatalf("Lookup failed: %v", err)
+ }
+
+ // Verify cached data exists
+ val, ok := client.vgroupAddressMap.Load("testGroup")
+ if !ok {
+ t.Fatal("No testGroup data in cache")
+ }
+ nodes := val.([]NamingServerNode)
+ t.Logf("Fetched %d nodes from server", len(nodes))
+ for i, node := range nodes {
+ t.Logf("Node %d: Healthy=%v, Host=%s, Port=%d", i,
node.Healthy, node.Transaction.Host, node.Transaction.Port)
+ }
+
+ // Verify correct number of valid instances (2 expected)
+ if len(instances) != 2 {
+ t.Fatalf("Expected 2 instances, got %d", len(instances))
+ }
+ expected := map[string]int{"10.0.0.1": 8091, "10.0.0.2": 8092}
+ for _, inst := range instances {
+ if port, ok := expected[inst.Addr]; !ok || inst.Port != port {
+ t.Errorf("Mismatched instance: %s:%d", inst.Addr,
inst.Port)
+ }
+ }
+}
+
+func TestRefreshToken_Success(t *testing.T) {
+ // Reset naming server singleton instance
+ resetInstance()
+ // Mock login endpoint returning valid token
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/api/v1/auth/login" || r.Method !=
http.MethodPost {
+ t.Errorf("Invalid request: %s %s", r.Method, r.URL.Path)
+ }
+ // Verify credentials
+ if r.URL.Query().Get("username") != testConfig.Username ||
r.URL.Query().Get("password") != testConfig.Password {
+ t.Error("Invalid username/password in request")
+ }
+ // Return success response with token
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+ }))
+ defer mockServer.Close()
+
+ client := GetInstance(testConfig)
+ client.logger = zap.NewNop()
+ err := client.RefreshToken(mockServer.Listener.Addr().String())
+ if err != nil {
+ t.Fatalf("RefreshToken failed: %v", err)
+ }
+ if client.jwtToken != "mock-jwt-token" {
+ t.Error("jwtToken not updated after login")
+ }
+}
+
+func TestRefreshToken_RetryMechanism(t *testing.T) {
+ resetInstance()
+ attemptCount := 0
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ attemptCount++
+ if attemptCount < 3 {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "retry-success-token",
+ })
+ }))
+ defer mockServer.Close()
+
+ client := GetInstance(testConfig)
+ client.logger = zap.NewNop()
+ err := client.RefreshToken(mockServer.Listener.Addr().String())
+ if err != nil {
+ t.Fatalf("RefreshToken failed: %v", err)
+ }
+ if attemptCount != 3 {
+ t.Errorf("Expected 3 attempts, got %d", attemptCount)
+ }
+ if client.jwtToken != "retry-success-token" {
+ t.Error("jwtToken not updated after retry success")
+ }
+}
+
+func TestWatch(t *testing.T) {
+ // Reset naming server singleton instance
+ resetInstance()
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/api/v1/auth/login":
+ // Handle login request
+ if r.Method != http.MethodPost {
+ t.Errorf("login: Expected POST, got %s",
r.Method)
+ }
+ if r.URL.Query().Get("username") != testConfig.Username
|| r.URL.Query().Get("password") != testConfig.Password {
+ t.Error("login: Invalid username/password")
+ }
+ w.Header().Set("Content-Type", contentTypeJSON)
+ w.WriteHeader(http.StatusOK)
+ _ = json.NewEncoder(w).Encode(map[string]string{
+ "code": "200",
+ "data": "mock-jwt-token",
+ })
+
+ case "/naming/v1/watch":
+ // Verify auth token
+ if r.Header.Get(authorizationHeader) !=
"mock-jwt-token" {
+ t.Errorf("watch: Missing valid token, got %s",
r.Header.Get(authorizationHeader))
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ // Verify request method and parameters
+ if r.Method != http.MethodPost {
+ t.Errorf("watch: Expected POST, got %s",
r.Method)
+ }
+ params := r.URL.Query()
+ if params.Get("vGroup") != "testGroup" {
+ t.Errorf("watch: Invalid vGroup, got %s",
params.Get("vGroup"))
+ }
+ if params.Get("clientTerm") != "0" {
+ t.Errorf("watch: Invalid clientTerm, got %s",
params.Get("clientTerm"))
+ }
+ w.WriteHeader(http.StatusOK)
+
+ default:
+ t.Errorf("Unknown path: %s", r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer mockServer.Close()
+
+ // Configure client
+ mockAddr := mockServer.Listener.Addr().String()
+ client := GetInstance(testConfig)
+ client.logger = zap.NewNop()
+ client.availableNamingMap.Store(mockAddr, int32(0))
+ client.namingAddrCache = mockAddr
+
+ // Call Watch and verify result
+ changed, err := client.Watch("testGroup")
+ if err != nil {
+ t.Fatalf("Watch failed: %v", err)
+ }
+ if !changed {
+ t.Error("Expected change detected (changed=true), got false")
+ }
+}
+
+// Test Watch handling non-200 responses
+func TestWatch_ErrorResponse(t *testing.T) {
+ // Reset naming server singleton instance
+ resetInstance()
+ // Mock server returning error status
+ mockServer := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusInternalServerError)
+ }))
+ defer mockServer.Close()
+
+ client := GetInstance(testConfig)
+ client.logger = zap.NewNop()
+ mockAddr := mockServer.Listener.Addr().String()
+ client.availableNamingMap.Store(mockAddr, int32(0))
+ client.namingAddrCache = mockAddr
+
+ // Verify Watch returns error for non-200 status
+ _, err := client.Watch("testGroup")
+ if err == nil {
+ t.Error("Expected error from Watch, got nil")
+ }
+}
+
+func TestNamingServerConfig_LoadFromYAML(t *testing.T) {
+ // Prepare test YAML content with custom configurations
+ yamlContent := `
+naming-server:
+ cluster: "test-cluster"
+ server-addr: "192.168.1.100:8888"
+ namespace: "test-ns"
+ heartbeat-period: 3000
+ metadata-max-age-ms: 60000
+ username: "test-user"
+ password: "test-pass"
+ token-validity-in-milliseconds: 3600000
+`
+
+ // Create temporary YAML file
+ tmpFile, err := os.CreateTemp("", "naming-server-test-*.yaml")
+ if err != nil {
+ t.Fatalf("Failed to create temp file: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
+
+ if _, err := tmpFile.WriteString(yamlContent); err != nil {
+ t.Fatalf("Failed to write YAML: %v", err)
+ }
+ tmpFile.Close()
+
+ // Parse YAML into config structure
+ var registryConfig struct {
+ NamingServer NamingServerConfig `yaml:"naming-server"`
+ }
+ data, err := os.ReadFile(tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to read temp file: %v", err)
+ }
+ if err := yaml.Unmarshal(data, ®istryConfig); err != nil {
+ t.Fatalf("Failed to unmarshal YAML: %v", err)
+ }
+
+ // Verify parsed values match YAML content
+ nsConfig := registryConfig.NamingServer
+ if nsConfig.Cluster != "test-cluster" {
+ t.Errorf("Cluster: expected 'test-cluster', got %s",
nsConfig.Cluster)
+ }
+ if nsConfig.ServerAddr != "192.168.1.100:8888" {
+ t.Errorf("ServerAddr: expected '192.168.1.100:8888', got %s",
nsConfig.ServerAddr)
+ }
+ if nsConfig.Namespace != "test-ns" {
+ t.Errorf("Namespace: expected 'test-ns', got %s",
nsConfig.Namespace)
+ }
+ if nsConfig.HeartbeatPeriod != 3000 {
+ t.Errorf("HeartbeatPeriod: expected 3000, got %d",
nsConfig.HeartbeatPeriod)
+ }
+ if nsConfig.Username != "test-user" {
+ t.Errorf("Username: expected 'test-user', got %s",
nsConfig.Username)
+ }
+ if nsConfig.Password != "test-pass" {
+ t.Errorf("Password: expected 'test-pass', got %s",
nsConfig.Password)
+ }
+ if nsConfig.TokenValidityInMilliseconds != 3600000 {
+ t.Errorf("TokenValidity: expected 3600000, got %d",
nsConfig.TokenValidityInMilliseconds)
+ }
+}
+
+func TestNamingServerConfig_DefaultValues(t *testing.T) {
+ // Initialize default values using flag registration
+ var nsConfig NamingServerConfig
+ fs := flag.NewFlagSet("test", flag.ContinueOnError)
+ nsConfig.RegisterFlagsWithPrefix("naming-server", fs)
+ if err := fs.Parse([]string{}); err != nil {
+ t.Fatalf("Failed to parse flags: %v", err)
+ }
+
+ // Parse empty YAML configuration
+ yamlContent := `naming-server: {}`
+ tmpFile, err := os.CreateTemp("", "naming-server-default-*.yaml")
+ if err != nil {
+ t.Fatalf("Failed to create temp file: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
+ if _, err := tmpFile.WriteString(yamlContent); err != nil {
+ t.Fatalf("Failed to write YAML: %v", err)
+ }
+ tmpFile.Close()
+
+ // Load empty config from YAML
+ var tmpConfig struct {
+ NamingServer NamingServerConfig `yaml:"naming-server"`
+ }
+ data, _ := os.ReadFile(tmpFile.Name())
+ if err := yaml.Unmarshal(data, &tmpConfig); err != nil {
+ t.Fatalf("Failed to unmarshal YAML: %v", err)
+ }
+
+ // Merge YAML values with defaults (YAML takes precedence)
+ if tmpConfig.NamingServer.ServerAddr != "" {
+ nsConfig.ServerAddr = tmpConfig.NamingServer.ServerAddr
+ }
+ if tmpConfig.NamingServer.Namespace != "" {
+ nsConfig.Namespace = tmpConfig.NamingServer.Namespace
+ }
+ if tmpConfig.NamingServer.HeartbeatPeriod != 0 {
+ nsConfig.HeartbeatPeriod =
tmpConfig.NamingServer.HeartbeatPeriod
+ }
+ if tmpConfig.NamingServer.MetadataMaxAgeMs != 0 {
+ nsConfig.MetadataMaxAgeMs =
tmpConfig.NamingServer.MetadataMaxAgeMs
+ }
+ if tmpConfig.NamingServer.Username != "" {
+ nsConfig.Username = tmpConfig.NamingServer.Username
+ }
+ if tmpConfig.NamingServer.TokenValidityInMilliseconds != 0 {
+ nsConfig.TokenValidityInMilliseconds =
tmpConfig.NamingServer.TokenValidityInMilliseconds
+ }
+
+ // Verify default values match expected
+ if nsConfig.ServerAddr != "127.0.0.1:8081" {
+ t.Errorf("ServerAddr default: expected '127.0.0.1:8081', got
%s", nsConfig.ServerAddr)
+ }
+ if nsConfig.Namespace != "public" {
+ t.Errorf("Namespace default: expected 'public', got %s",
nsConfig.Namespace)
+ }
+ if nsConfig.HeartbeatPeriod != 5000 {
+ t.Errorf("HeartbeatPeriod default: expected 5000, got %d",
nsConfig.HeartbeatPeriod)
+ }
+ if nsConfig.MetadataMaxAgeMs != 30000 {
+ t.Errorf("MetadataMaxAgeMs default: expected 30000, got %d",
nsConfig.MetadataMaxAgeMs)
+ }
+ if nsConfig.Username != "" {
+ t.Errorf("Username default: expected empty, got %s",
nsConfig.Username)
+ }
+ if nsConfig.TokenValidityInMilliseconds != 29*60*1000 {
+ t.Errorf("TokenValidity default: expected 1740000, got %d",
nsConfig.TokenValidityInMilliseconds)
+ }
+}
+
+func TestInitRegistry_WithNamingServerConfig(t *testing.T) {
+ // Reset global registry instance
+ registryServiceInstance = nil
+ // Reset naming server singleton instance
+ resetInstance()
+
+ // Create custom configuration
+ customConfig := &RegistryConfig{
+ Type: NAMINGSERVER,
+ NamingServer: NamingServerConfig{
+ ServerAddr: "custom-server:8080",
+ Username: "init-test-user",
+ Password: "init-test-pass",
+ HeartbeatPeriod: 2000,
+ TokenValidityInMilliseconds: 600000,
+ },
+ }
+
+ // Initialize registry with custom config
+ InitRegistry(nil, customConfig)
+
+ // Verify registry initialization
+ registry := GetRegistry()
+ if registry == nil {
+ t.Fatal("InitRegistry failed: registry is nil")
+ }
+
+ // Check registry type
+ namingRegistry, ok := registry.(*NamingServerRegistryService)
+ if !ok {
+ t.Fatalf("Expected NamingServerRegistryService, got %T",
registry)
+ }
+
+ // Verify client uses custom config
+ client := namingRegistry.client
+ if client.config.ServerAddr != "custom-server:8080" {
+ t.Errorf("Client ServerAddr: expected 'custom-server:8080', got
%s", client.config.ServerAddr)
+ }
+ if client.config.Username != "init-test-user" {
+ t.Errorf("Client Username: expected 'init-test-user', got %s",
client.config.Username)
+ }
+ if client.config.Password != "init-test-pass" {
+ t.Errorf("Client Password: expected 'init-test-pass', got %s",
client.config.Password)
+ }
+ if client.config.HeartbeatPeriod != 2000 {
+ t.Errorf("Client HeartbeatPeriod: expected 2000, got %d",
client.config.HeartbeatPeriod)
+ }
+ if client.config.TokenValidityInMilliseconds != 600000 {
+ t.Errorf("Client TokenValidity: expected 600000, got %d",
client.config.TokenValidityInMilliseconds)
+ }
+}
diff --git a/pkg/discovery/base.go b/pkg/util/rand/rand.go
similarity index 68%
copy from pkg/discovery/base.go
copy to pkg/util/rand/rand.go
index fcc0d016..2fa771a4 100644
--- a/pkg/discovery/base.go
+++ b/pkg/util/rand/rand.go
@@ -15,25 +15,24 @@
* limitations under the License.
*/
-package discovery
+package rand
-const (
- FILE string = "file"
- NACOS string = "nacos"
- ETCD string = "etcd"
- EUREKA string = "eureka"
- REDIS string = "redis"
- ZK string = "zk"
- CONSUL string = "consul"
- SOFA string = "sofa"
+import (
+ "math/rand"
+ "sync"
+ "time"
)
-type ServiceInstance struct {
- Addr string
- Port int
-}
+var (
+ randGen = rand.New(rand.NewSource(time.Now().UnixNano()))
+ mu sync.Mutex
+)
-type RegistryService interface {
- Lookup(key string) ([]*ServiceInstance, error)
- Close()
+func RandIntn(n int) int {
+ if n <= 0 {
+ return 0
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ return randGen.Intn(n)
}
diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml
index fa8a7f82..d52b8456 100644
--- a/testdata/conf/seatago.yml
+++ b/testdata/conf/seatago.yml
@@ -120,7 +120,7 @@ seata:
data-id: seata.properties
# Registration Center
registry:
- type: file
+ type: naming-server
file:
name: seatago.yml
nacos:
@@ -136,6 +136,15 @@ seata:
etcd3:
cluster: "default"
server-addr: "http://localhost:2379"
+ naming-server:
+ cluster: "default"
+ server-addr: "127.0.0.1:8081"
+ namespace: "public"
+ heartbeat-period: 5000
+ metadata-max-age-ms: 30000
+ username: ""
+ password: ""
+ token-validity-in-milliseconds: 1740000
log:
exception-rate: 100
tcc:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]