This is an automated email from the ASF dual-hosted git repository.

goodboycoder pushed a commit to branch feature/support-namingserver
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/support-namingserver 
by this push:
     new 169cfd13 OSPP 2025 Go&Java Version alignment  /support-namingserver 
(#924)
169cfd13 is described below

commit 169cfd1317ec44cc37c3e66050e3027ba3f81f60
Author: flypiggy <[email protected]>
AuthorDate: Thu Oct 30 16:48:34 2025 +0800

    OSPP 2025 Go&Java Version alignment  /support-namingserver (#924)
    
    * upd-namingserver
    
    * upd
---
 pkg/discovery/base.go                              |  17 +-
 pkg/discovery/config.go                            |  34 +-
 pkg/discovery/init.go                              |  14 +
 pkg/discovery/naming_server.go                     | 673 ++++++++++++++++++++
 .../{base.go => naming_server_registry.go}         |  39 +-
 pkg/discovery/naming_server_test.go                | 683 +++++++++++++++++++++
 pkg/{discovery/base.go => util/rand/rand.go}       |  33 +-
 testdata/conf/seatago.yml                          |  11 +-
 8 files changed, 1456 insertions(+), 48 deletions(-)

diff --git a/pkg/discovery/base.go b/pkg/discovery/base.go
index fcc0d016..8438a808 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/base.go
@@ -18,14 +18,15 @@
 package discovery
 
 const (
-       FILE   string = "file"
-       NACOS  string = "nacos"
-       ETCD   string = "etcd"
-       EUREKA string = "eureka"
-       REDIS  string = "redis"
-       ZK     string = "zk"
-       CONSUL string = "consul"
-       SOFA   string = "sofa"
+       FILE         string = "file"
+       NACOS        string = "nacos"
+       ETCD         string = "etcd"
+       EUREKA       string = "eureka"
+       REDIS        string = "redis"
+       ZK           string = "zk"
+       CONSUL       string = "consul"
+       SOFA         string = "sofa"
+       NAMINGSERVER string = "namingserver"
 )
 
 type ServiceInstance struct {
diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go
index d8a78874..d0375c21 100644
--- a/pkg/discovery/config.go
+++ b/pkg/discovery/config.go
@@ -38,10 +38,11 @@ func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix 
string, f *flag.FlagSet
 }
 
 type RegistryConfig struct {
-       Type  string      `yaml:"type" json:"type" koanf:"type"`
-       File  FileConfig  `yaml:"file" json:"file" koanf:"file"`
-       Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
-       Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
+       Type         string             `yaml:"type" json:"type" koanf:"type"`
+       File         FileConfig         `yaml:"file" json:"file" koanf:"file"`
+       Nacos        NacosConfig        `yaml:"nacos" json:"nacos" 
koanf:"nacos"`
+       Etcd3        Etcd3Config        `yaml:"etcd3" json:"etcd3" 
koanf:"etcd3"`
+       NamingServer NamingServerConfig `yaml:"naming-server" 
json:"namingServer" koanf:"naming-server"`
 }
 
 func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f 
*flag.FlagSet) {
@@ -49,6 +50,7 @@ func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix 
string, f *flag.FlagSe
        cfg.File.RegisterFlagsWithPrefix(prefix+".file", f)
        cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f)
        cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f)
+       cfg.NamingServer.RegisterFlagsWithPrefix(prefix+".naming-server", f)
 }
 
 type FileConfig struct {
@@ -90,3 +92,27 @@ func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix 
string, f *flag.FlagSet)
        f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server 
address of registry.")
        f.StringVar(&cfg.ServerAddr, prefix+".server-addr", 
"http://localhost:2379";, "The server address of registry.")
 }
+
+type NamingServerConfig struct {
+       Cluster                     string `yaml:"cluster" json:"cluster" 
koanf:"cluster"`
+       ServerAddr                  string `yaml:"server-addr" 
json:"server-addr" koanf:"server-addr"`
+       Namespace                   string `yaml:"namespace" json:"namespace" 
koanf:"namespace"`
+       HeartbeatPeriod             int    `yaml:"heartbeat-period" 
json:"heartbeat-period" koanf:"heartbeat-period"`
+       MetadataMaxAgeMs            int64  `yaml:"metadata-max-age-ms" 
json:"metadata-max-age-ms" koanf:"metadata-max-age-ms"`
+       Username                    string `yaml:"username" json:"username" 
koanf:"username"`
+       Password                    string `yaml:"password" json:"password" 
koanf:"password"`
+       TokenValidityInMilliseconds int64  
`yaml:"token-validity-in-milliseconds" json:"token-validity-in-milliseconds" 
koanf:"token-validity-in-milliseconds"`
+       SecretKey                   string `yaml:"secret-key" json:"secret-key" 
koanf:"secret-key"`
+}
+
+func (cfg *NamingServerConfig) RegisterFlagsWithPrefix(prefix string, f 
*flag.FlagSet) {
+       f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The cluster 
name of naming server")
+       f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "127.0.0.1:8081", 
"The server address of naming server (host:port)")
+       f.StringVar(&cfg.Namespace, prefix+".namespace", "public", "The 
namespace of naming server")
+       f.IntVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 5000, "The 
heartbeat period in milliseconds")
+       f.Int64Var(&cfg.MetadataMaxAgeMs, prefix+".metadata-max-age-ms", 30000, 
"The max age of metadata in milliseconds")
+       f.StringVar(&cfg.Username, prefix+".username", "", "The username for 
authentication")
+       f.StringVar(&cfg.Password, prefix+".password", "", "The password for 
authentication")
+       f.Int64Var(&cfg.TokenValidityInMilliseconds, 
prefix+".token-validity-in-milliseconds", 29*60*1000, "The validity period of 
token in milliseconds")
+       f.StringVar(&cfg.SecretKey, prefix+".secret-key", "", "The secret key 
for JWT authentication (matches Naming Server's secretKey)")
+}
diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go
index 8e7f5e38..b4ff329e 100644
--- a/pkg/discovery/init.go
+++ b/pkg/discovery/init.go
@@ -47,6 +47,9 @@ func InitRegistry(serviceConfig *ServiceConfig, 
registryConfig *RegistryConfig)
                //TODO: init consul registry
        case SOFA:
                //TODO: init sofa registry
+       case NAMINGSERVER:
+               // init namingserver registry
+               registryService = newNamingServerRegistryService(serviceConfig, 
&registryConfig.NamingServer)
        default:
                err = fmt.Errorf("service registry not support registry 
type:%s", registryConfig.Type)
        }
@@ -60,3 +63,14 @@ func InitRegistry(serviceConfig *ServiceConfig, 
registryConfig *RegistryConfig)
 func GetRegistry() RegistryService {
        return registryServiceInstance
 }
+
+func GetNamingserverRegistry() (NamingserverRegistry, error) {
+       if registryServiceInstance == nil {
+               return nil, fmt.Errorf("registry service not initialized")
+       }
+       namingReg, ok := registryServiceInstance.(NamingserverRegistry)
+       if !ok {
+               return nil, fmt.Errorf("current registry is not namingserver")
+       }
+       return namingReg, nil
+}
diff --git a/pkg/discovery/naming_server.go b/pkg/discovery/naming_server.go
new file mode 100644
index 00000000..44117c8a
--- /dev/null
+++ b/pkg/discovery/naming_server.go
@@ -0,0 +1,673 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       gostnet "github.com/dubbogo/gost/net"
+       "go.uber.org/zap"
+       "net/http"
+       "net/url"
+       "seata.apache.org/seata-go/pkg/util/rand"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+const (
+       httpPrefix               = "http://";
+       healthCheckThreshold     = 3
+       longPollTimeoutPeriod    = 28 * time.Second
+       authorizationHeader      = "Authorization"
+       contentTypeJSON          = "application/json"
+       namingAddrCacheTTL       = 30 * time.Second
+       failureRecoveryThreshold = 3
+       maxRetryAttempts         = 3
+       retryDelayMs             = 1000
+)
+
+type MetaResponse struct {
+       Term        int64     `json:"term"`
+       ClusterList []Cluster `json:"clusterList"`
+}
+
+type Cluster struct {
+       ClusterName string `json:"clusterName"`
+       ClusterType string `json:"clusterType"`
+       UnitData    []Unit `json:"unitData"`
+}
+
+type Unit struct {
+       UnitName           string             `json:"unitName"`
+       NamingInstanceList []NamingServerNode `json:"namingInstanceList"`
+}
+
+type NamingServerNode struct {
+       Role        ClusterRole            `json:"role"`
+       Term        int64                  `json:"term"`
+       Transaction Endpoint               `json:"transaction"`
+       Control     Endpoint               `json:"control"`
+       Internal    Endpoint               `json:"internal"`
+       Group       string                 `json:"group"`
+       Version     string                 `json:"version"`
+       Metadata    map[string]interface{} `json:"metadata"`
+       TimeStamp   int64                  `json:"timeStamp"`
+       Weight      float64                `json:"weight"`
+       Healthy     bool                   `json:"healthy"`
+       Unit        string                 `json:"unit"`
+}
+
+type Endpoint struct {
+       Host     string `json:"host"`
+       Port     int    `json:"port"`
+       Protocol string `json:"protocol"`
+}
+
+type ExternalEndpoint struct {
+       Host            string `json:"host"`
+       ControlPort     int    `json:"controlPort"`
+       TransactionPort int    `json:"transactionPort"`
+}
+
+type ClusterRole string
+
+const (
+       ClusterRoleLeader ClusterRole = "LEADER"
+       ClusterRoleMember ClusterRole = "MEMBER"
+)
+
+type NamingServerClient struct {
+       config                   *NamingServerConfig
+       logger                   *zap.Logger
+       mu                       sync.Mutex
+       instance                 *NamingServerClient
+       term                     int64
+       jwtToken                 string
+       tokenTimeStamp           int64
+       isSubscribed             bool
+       namingAddrCache          string
+       namingAddrCacheTimestamp int64
+
+       availableNamingMap sync.Map
+       vgroupAddressMap   sync.Map
+       listenerServiceMap sync.Map
+
+       healthCheckTicker *time.Ticker
+       closeChan         chan struct{}
+       wg                sync.WaitGroup
+
+       httpClient     *http.Client
+       longPollClient *http.Client
+}
+
+type NamingListener interface {
+       OnEvent(vGroup string) error
+}
+
+type NamingServerRegistryService struct {
+       client *NamingServerClient
+}
+
+var _ NamingserverRegistry = (*NamingServerRegistryService)(nil)
+
+func (n *NamingServerRegistryService) Lookup(key string) ([]*ServiceInstance, 
error) {
+       return n.client.Lookup(key)
+}
+
+func (n *NamingServerRegistryService) Close() {
+       n.client.Close()
+}
+
+func newNamingServerRegistryService(_ *ServiceConfig, cfg *NamingServerConfig) 
RegistryService {
+       client := GetInstance(cfg)
+       return &NamingServerRegistryService{
+               client: client,
+       }
+}
+
+var (
+       namingServerInstance *NamingServerClient
+       namingServerOnce     sync.Once
+)
+
+func GetInstance(config *NamingServerConfig) *NamingServerClient {
+       namingServerOnce.Do(func() {
+               namingServerInstance = &NamingServerClient{
+                       config:            config,
+                       logger:            
zap.L().Named("naming-server-client"),
+                       closeChan:         make(chan struct{}),
+                       healthCheckTicker: 
time.NewTicker(time.Duration(config.HeartbeatPeriod) * time.Millisecond),
+                       httpClient:        &http.Client{Timeout: 3 * 
time.Second},
+                       longPollClient:    &http.Client{Timeout: 30 * 
time.Second},
+               }
+               // Initialize available naming server addresses from config
+               namingServerInstance.initNamingAddrs()
+               namingServerInstance.initHealthCheck()
+       })
+       return namingServerInstance
+}
+
+func resetInstance() {
+       if namingServerInstance != nil {
+               namingServerInstance.Close()
+               namingServerInstance.mu.Lock()
+               namingServerInstance.clearNamingAddrCache()
+               namingServerInstance.mu.Unlock()
+       }
+       namingServerInstance = nil
+       namingServerOnce = sync.Once{}
+}
+
+func (c *NamingServerClient) initNamingAddrs() {
+       addrs := c.getNamingAddrs()
+       for _, addr := range addrs {
+               c.availableNamingMap.Store(addr, int32(0))
+       }
+}
+
+func (c *NamingServerClient) initHealthCheck() {
+       c.wg.Add(1)
+       go func() {
+               defer c.wg.Done()
+               for {
+                       select {
+                       case <-c.healthCheckTicker.C:
+                               urlList := c.getNamingAddrs()
+                               c.checkAvailableNamingAddr(urlList)
+                       case <-c.closeChan:
+                               return
+                       }
+               }
+       }()
+}
+
+func (c *NamingServerClient) checkAvailableNamingAddr(urlList []string) {
+       for _, addr := range urlList {
+               isHealthy := c.doHealthCheck(addr)
+
+               val, _ := c.availableNamingMap.LoadOrStore(addr, int32(0))
+               failCount := val.(int32)
+
+               if !isHealthy {
+                       newFailCount := atomic.AddInt32(&failCount, 1)
+                       c.availableNamingMap.Store(addr, newFailCount)
+                       if newFailCount == 1 {
+                               c.logger.Warn("naming server check failed", 
zap.String("addr", addr), zap.Int32("failCount", newFailCount))
+                       } else if newFailCount >= healthCheckThreshold {
+                               c.logger.Error("naming server offline", 
zap.String("addr", addr), zap.Int32("failCount", newFailCount))
+                               c.mu.Lock()
+                               if c.namingAddrCache == addr {
+                                       c.clearNamingAddrCache()
+                               }
+                               c.mu.Unlock()
+                       }
+               } else {
+                       if failCount > 0 {
+                               c.availableNamingMap.Store(addr, int32(0))
+                               c.logger.Info("naming server recovered", 
zap.String("addr", addr), zap.Int32("previousFailCount", failCount))
+                       }
+               }
+       }
+}
+
+func (c *NamingServerClient) doHealthCheck(addr string) bool {
+       checkURL := fmt.Sprintf("%s%s/naming/v1/health", httpPrefix, addr)
+
+       req, err := http.NewRequest(http.MethodGet, checkURL, nil)
+       if err != nil {
+               c.logger.Error("create health check request failed", 
zap.Error(err))
+               return false
+       }
+       req.Header.Set("Content-Type", contentTypeJSON)
+
+       resp, err := c.httpClient.Do(req)
+       if err != nil {
+               c.logger.Error("health check failed", zap.String("addr", addr), 
zap.Error(err))
+               return false
+       }
+       defer resp.Body.Close()
+       return resp.StatusCode == http.StatusOK
+}
+
+func (c *NamingServerClient) getNamingAddrs() []string {
+       return strings.Split(c.config.ServerAddr, ",")
+}
+
+func (c *NamingServerClient) Lookup(vGroup string) ([]*ServiceInstance, error) 
{
+       if !c.isSubscribed {
+               if err := c.RefreshGroup(vGroup); err != nil {
+                       return nil, fmt.Errorf("refresh group failed: %w", err)
+               }
+               listener := &RefreshListener{client: c}
+               if err := c.Subscribe(vGroup, listener); err != nil {
+                       return nil, fmt.Errorf("subscribe failed: %w", err)
+               }
+       }
+
+       val, ok := c.vgroupAddressMap.Load(vGroup)
+       if !ok {
+               if err := c.RefreshGroup(vGroup); err != nil {
+                       return nil, fmt.Errorf("refresh group failed: %w", err)
+               }
+               val, ok = c.vgroupAddressMap.Load(vGroup)
+               if !ok {
+                       return nil, errors.New("no nodes found for vgroup")
+               }
+       }
+
+       nodes := val.([]NamingServerNode)
+
+       var instances []*ServiceInstance
+       for _, node := range nodes {
+               if !node.Healthy {
+                       continue
+               }
+
+               if node.Transaction.Host == "" || node.Transaction.Port <= 0 || 
node.Transaction.Port > 65535 {
+                       c.logger.Warn("invalid node address", 
zap.String("host", node.Transaction.Host), zap.Int("port", 
node.Transaction.Port))
+                       continue
+               }
+
+               instances = append(instances, &ServiceInstance{
+                       Addr: node.Transaction.Host,
+                       Port: node.Transaction.Port,
+               })
+       }
+       return instances, nil
+}
+
+func (c *NamingServerClient) RefreshGroup(vGroup string) error {
+       namingAddr, err := c.getNamingAddr()
+       if err != nil {
+               return err
+       }
+
+       if c.isTokenExpired() {
+               if err := c.RefreshToken(namingAddr); err != nil {
+                       return err
+               }
+       }
+
+       params := url.Values{}
+       params.Add("vGroup", vGroup)
+       params.Add("namespace", c.config.Namespace)
+
+       discoveryURL := fmt.Sprintf("%s%s/naming/v1/discovery?%s", httpPrefix, 
namingAddr, params.Encode())
+       req, err := http.NewRequest(http.MethodGet, discoveryURL, nil)
+       if err != nil {
+               return err
+       }
+       if c.jwtToken != "" {
+               req.Header.Set(authorizationHeader, c.jwtToken)
+       }
+       req.Header.Set("Content-Type", contentTypeJSON)
+
+       resp, err := c.httpClient.Do(req)
+       if err != nil {
+               return fmt.Errorf("discovery request failed: %w", err)
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               if resp.StatusCode == http.StatusUnauthorized {
+                       return fmt.Errorf("discovery failed: unauthorized 
(401), authentication required. please configure username and password in the 
naming server config")
+               }
+               return fmt.Errorf("discovery failed, status: %d", 
resp.StatusCode)
+       }
+
+       var metaResp MetaResponse
+       if err := json.NewDecoder(resp.Body).Decode(&metaResp); err != nil {
+               return fmt.Errorf("decode meta response failed: %w", err)
+       }
+
+       return c.handleMetadata(&metaResp, vGroup)
+}
+
+func (c *NamingServerClient) getNamingAddr() (string, error) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       now := time.Now().UnixMilli()
+
+       if c.namingAddrCache != "" &&
+               now-c.namingAddrCacheTimestamp < 
namingAddrCacheTTL.Milliseconds() {
+
+               if val, ok := c.availableNamingMap.Load(c.namingAddrCache); ok {
+                       failCount := val.(int32)
+                       if failCount < healthCheckThreshold {
+                               return c.namingAddrCache, nil
+                       }
+               }
+
+               c.clearNamingAddrCache()
+       }
+
+       var availableAddrs []string
+       c.availableNamingMap.Range(func(key, value interface{}) bool {
+               addr := key.(string)
+               failCount := value.(int32)
+               if failCount < healthCheckThreshold {
+                       availableAddrs = append(availableAddrs, addr)
+               }
+               return true
+       })
+
+       if len(availableAddrs) == 0 {
+               return "", errors.New("no available naming server")
+       }
+
+       addr := availableAddrs[rand.RandIntn(len(availableAddrs))]
+       c.namingAddrCache = addr
+       c.namingAddrCacheTimestamp = now
+       return addr, nil
+}
+
+func (c *NamingServerClient) clearNamingAddrCache() {
+       c.namingAddrCache = ""
+       c.namingAddrCacheTimestamp = 0
+}
+
+func (c *NamingServerClient) isTokenExpired() bool {
+       if c.config.Username == "" || c.config.Password == "" {
+               return false
+       }
+
+       ts := atomic.LoadInt64(&c.tokenTimeStamp)
+       if ts == 0 {
+               return true
+       }
+
+       return time.Now().UnixMilli() >= ts+c.config.TokenValidityInMilliseconds
+}
+
+func (c *NamingServerClient) RefreshToken(addr string) error {
+       if c.config.Username == "" || c.config.Password == "" {
+               return nil
+       }
+
+       var lastErr error
+       for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
+               err := c.doRefreshToken(addr)
+               if err == nil {
+                       return nil
+               }
+
+               lastErr = err
+               if attempt < maxRetryAttempts {
+                       c.logger.Warn("token refresh failed, retrying",
+                               zap.String("addr", addr),
+                               zap.Int("attempt", attempt),
+                               zap.Error(err))
+                       time.Sleep(time.Duration(retryDelayMs*attempt) * 
time.Millisecond)
+               }
+       }
+
+       c.logger.Error("token refresh failed after all retries",
+               zap.String("addr", addr),
+               zap.Int("maxAttempts", maxRetryAttempts),
+               zap.Error(lastErr))
+       return fmt.Errorf("token refresh failed after %d attempts: %w", 
maxRetryAttempts, lastErr)
+}
+
+func (c *NamingServerClient) doRefreshToken(addr string) error {
+       loginURL := fmt.Sprintf("%s%s/api/v1/auth/login", httpPrefix, addr)
+
+       loginReq := struct {
+               Username string `json:"username"`
+               Password string `json:"password"`
+       }{
+               Username: c.config.Username,
+               Password: c.config.Password,
+       }
+
+       body, err := json.Marshal(loginReq)
+       if err != nil {
+               return fmt.Errorf("marshal login request failed: %w", err)
+       }
+
+       req, err := http.NewRequest(http.MethodPost, loginURL, 
strings.NewReader(string(body)))
+       if err != nil {
+               return err
+       }
+       req.Header.Set("Content-Type", contentTypeJSON)
+
+       resp, err := c.httpClient.Do(req)
+       if err != nil {
+               return fmt.Errorf("login request failed: %w", err)
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               c.logger.Error("login request failed",
+                       zap.String("addr", addr),
+                       zap.Int("statusCode", resp.StatusCode),
+                       zap.String("username", c.config.Username))
+               return fmt.Errorf("authentication failed: status %d", 
resp.StatusCode)
+       }
+
+       var loginResp struct {
+               Success bool        `json:"success"`
+               Data    string      `json:"data"`
+               Code    interface{} `json:"code"`
+               Msg     string      `json:"msg"`
+       }
+       if err := json.NewDecoder(resp.Body).Decode(&loginResp); err != nil {
+               return fmt.Errorf("decode login response failed: %w", err)
+       }
+
+       c.logger.Debug("login response received",
+               zap.Bool("success", loginResp.Success),
+               zap.String("code", fmt.Sprint(loginResp.Code)),
+               zap.String("msg", loginResp.Msg),
+               zap.String("data", loginResp.Data))
+
+       if !loginResp.Success {
+               errMsg := fmt.Sprintf("authentication failed: success=false, 
msg=%s", loginResp.Msg)
+               c.logger.Error(errMsg)
+               return fmt.Errorf(errMsg)
+       }
+       if loginResp.Data == "" {
+               return fmt.Errorf("authentication failed: token is empty")
+       }
+
+       c.jwtToken = loginResp.Data
+       atomic.StoreInt64(&c.tokenTimeStamp, time.Now().UnixMilli())
+       c.logger.Info("token refreshed successfully", zap.String("addr", addr))
+       return nil
+}
+
+func (c *NamingServerClient) handleMetadata(metaResp *MetaResponse, vGroup 
string) error {
+       if metaResp.Term > 0 {
+               atomic.StoreInt64(&c.term, metaResp.Term)
+       }
+
+       var newNodes []NamingServerNode
+       for _, cluster := range metaResp.ClusterList {
+               for _, unit := range cluster.UnitData {
+                       for _, node := range unit.NamingInstanceList {
+                               if (node.Role == ClusterRoleLeader && node.Term 
>= atomic.LoadInt64(&c.term)) ||
+                                       node.Role == ClusterRoleMember {
+                                       newNodes = append(newNodes, node)
+                               }
+                       }
+               }
+       }
+
+       c.vgroupAddressMap.Store(vGroup, newNodes)
+       return nil
+}
+
+type RefreshListener struct {
+       client *NamingServerClient
+}
+
+func (l *RefreshListener) OnEvent(vGroup string) error {
+       return l.client.RefreshGroup(vGroup)
+}
+
+func (c *NamingServerClient) Subscribe(vGroup string, listener NamingListener) 
error {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       val, _ := c.listenerServiceMap.LoadOrStore(vGroup, []NamingListener{})
+       listeners := append(val.([]NamingListener), listener)
+       c.listenerServiceMap.Store(vGroup, listeners)
+
+       if !c.isSubscribed {
+               c.isSubscribed = true
+               c.wg.Add(1)
+               go c.watchLoop(vGroup)
+       }
+       return nil
+}
+
+func (c *NamingServerClient) watchLoop(vGroup string) {
+       defer c.wg.Done()
+       var retryCount int
+
+       for {
+               select {
+               case <-c.closeChan:
+                       return
+               default:
+               }
+
+               changed, err := c.Watch(vGroup)
+               if err != nil {
+                       retryCount++
+                       if retryCount > failureRecoveryThreshold {
+                               c.logger.Error("watch failed continuously, will 
retry",
+                                       zap.Error(err), zap.Int("retryCount", 
retryCount))
+                               select {
+                               case <-time.After(time.Duration(retryDelayMs) * 
time.Millisecond):
+                               case <-c.closeChan:
+                                       return
+                               }
+                       } else {
+                               c.logger.Warn("watch error, will retry 
immediately", zap.Error(err))
+                       }
+                       continue
+               }
+
+               retryCount = 0
+
+               if changed {
+                       if err := c.RefreshGroup(vGroup); err != nil {
+                               c.logger.Error("refresh group failed in watch", 
zap.Error(err))
+                               continue
+                       }
+                       val, ok := c.listenerServiceMap.Load(vGroup)
+                       if !ok {
+                               continue
+                       }
+                       for _, listener := range val.([]NamingListener) {
+                               if err := listener.OnEvent(vGroup); err != nil {
+                                       c.logger.Warn("listener callback 
failed", zap.Error(err))
+                               }
+                       }
+               }
+       }
+}
+
+func (c *NamingServerClient) Watch(vGroup string) (bool, error) {
+       namingAddr, err := c.getNamingAddr()
+       if err != nil {
+               return false, err
+       }
+
+       if c.isTokenExpired() {
+               if err := c.RefreshToken(namingAddr); err != nil {
+                       return false, err
+               }
+       }
+
+       clientIP, err := gostnet.GetLocalIP()
+       if err != nil {
+               return false, fmt.Errorf("failed to get local IP: %w", err)
+       }
+
+       params := url.Values{}
+       params.Add("vGroup", vGroup)
+       params.Add("clientTerm", strconv.FormatInt(atomic.LoadInt64(&c.term), 
10))
+       params.Add("timeout", 
strconv.FormatInt(longPollTimeoutPeriod.Milliseconds(), 10))
+       params.Add("clientAddr", clientIP)
+
+       watchURL := fmt.Sprintf("%s%s/naming/v1/watch?%s", httpPrefix, 
namingAddr, params.Encode())
+       req, err := http.NewRequest(http.MethodPost, watchURL, nil)
+       if err != nil {
+               return false, err
+       }
+       if c.jwtToken != "" {
+               req.Header.Set(authorizationHeader, c.jwtToken)
+       }
+       req.Header.Set("Content-Type", contentTypeJSON)
+
+       resp, err := c.longPollClient.Do(req)
+       if err != nil {
+               return false, fmt.Errorf("watch request failed: %w", err)
+       }
+       defer resp.Body.Close()
+
+       switch resp.StatusCode {
+       case http.StatusOK:
+               return true, nil
+       case http.StatusNotModified:
+               return false, nil
+       case http.StatusUnauthorized:
+               return false, fmt.Errorf("watch failed: unauthorized (401), 
token may have expired or authentication is required")
+       default:
+               return false, fmt.Errorf("watch request failed with status 
code: %d", resp.StatusCode)
+       }
+}
+
+func (c *NamingServerClient) Close() {
+       close(c.closeChan)
+       c.healthCheckTicker.Stop()
+       c.wg.Wait()
+       c.logger.Info("naming server client closed")
+}
+
+func (n *NamingServerRegistryService) Register(instance *ServiceInstance) 
error {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (n *NamingServerRegistryService) Deregister(instance *ServiceInstance) 
error {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (n *NamingServerRegistryService) doHealthCheck(addr string) bool {
+       return n.client.doHealthCheck(addr)
+}
+
+func (n *NamingServerRegistryService) RefreshToken(addr string) error {
+       return n.client.RefreshToken(addr)
+}
+
+func (n *NamingServerRegistryService) RefreshGroup(vGroup string) error {
+       return n.client.RefreshGroup(vGroup)
+}
+
+func (n *NamingServerRegistryService) Watch(vGroup string) (bool, error) {
+       return n.client.Watch(vGroup)
+}
diff --git a/pkg/discovery/base.go b/pkg/discovery/naming_server_registry.go
similarity index 56%
copy from pkg/discovery/base.go
copy to pkg/discovery/naming_server_registry.go
index fcc0d016..f5289fd8 100644
--- a/pkg/discovery/base.go
+++ b/pkg/discovery/naming_server_registry.go
@@ -17,23 +17,26 @@
 
 package discovery
 
-const (
-       FILE   string = "file"
-       NACOS  string = "nacos"
-       ETCD   string = "etcd"
-       EUREKA string = "eureka"
-       REDIS  string = "redis"
-       ZK     string = "zk"
-       CONSUL string = "consul"
-       SOFA   string = "sofa"
-)
-
-type ServiceInstance struct {
-       Addr string
-       Port int
-}
+type NamingserverRegistry interface {
+       RegistryService
+
+       Register(instance *ServiceInstance) error
+
+       Deregister(instance *ServiceInstance) error
+
+       // doHealthCheck
+       // perform a health check and call the/amine/v1/health interface
+       doHealthCheck(addr string) bool
+
+       // RefreshToken
+       // Refresh the JWT token and call the /api/v1/auth/login interface.
+       RefreshToken(addr string) error
+
+       // RefreshGroup
+       // Refresh the service group information and call the 
/naming/v1/discovery interface.
+       RefreshGroup(vGroup string) error
 
-type RegistryService interface {
-       Lookup(key string) ([]*ServiceInstance, error)
-       Close()
+       // Watch
+       // Monitor service changes and call the /naming/v1/watch interface.
+       Watch(vGroup string) (bool, error)
 }
diff --git a/pkg/discovery/naming_server_test.go 
b/pkg/discovery/naming_server_test.go
new file mode 100644
index 00000000..f342c631
--- /dev/null
+++ b/pkg/discovery/naming_server_test.go
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package discovery
+
+import (
+       "encoding/json"
+       "flag"
+       "go.uber.org/zap"
+       "gopkg.in/yaml.v2"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "sync/atomic"
+       "testing"
+       "time"
+)
+
+// fake config for tests
+var testConfig = &NamingServerConfig{
+       ServerAddr:                  "127.0.0.1:8080,localhost:9090",
+       HeartbeatPeriod:             1000,
+       Namespace:                   "testns",
+       Username:                    "user",
+       Password:                    "pass",
+       TokenValidityInMilliseconds: 100,
+       MetadataMaxAgeMs:            1000,
+}
+
+// Test getNamingAddrs splits config.ServerAddr
+func TestGetNamingAddrs(t *testing.T) {
+       client := &NamingServerClient{
+               config:         testConfig,
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       addrs := client.getNamingAddrs()
+       if len(addrs) != 2 {
+               t.Errorf("expected 2 addresses, got %d", len(addrs))
+       }
+}
+
+// Test isTokenExpired behavior
+func TestIsTokenExpired_NoCredentials(t *testing.T) {
+       cfg := *testConfig
+       cfg.Username = ""
+       cfg.Password = ""
+       client := &NamingServerClient{
+               config:         &cfg,
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       // without credentials, token should never be considered expired
+       atomic.StoreInt64(&client.tokenTimeStamp, 0)
+       if client.isTokenExpired() {
+               t.Error("expected token not expired when no credentials")
+       }
+}
+
+func TestIsTokenExpired_WithCredentials(t *testing.T) {
+       client := &NamingServerClient{
+               config:         testConfig,
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       // timestamp zero => expired
+       atomic.StoreInt64(&client.tokenTimeStamp, 0)
+       if !client.isTokenExpired() {
+               t.Error("expected token expired when timestamp zero and 
credentials present")
+       }
+       // future timestamp => not expired
+       atomic.StoreInt64(&client.tokenTimeStamp, time.Now().UnixMilli())
+       if client.isTokenExpired() {
+               t.Error("expected token not expired when timestamp fresh")
+       }
+}
+
+// Test doHealthCheck with a local httptest server
+func TestDoHealthCheck(t *testing.T) {
+       // healthy server
+       healthy := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusOK)
+       }))
+       defer healthy.Close()
+       addr := healthy.Listener.Addr().String()
+       client := &NamingServerClient{
+               logger:         zap.NewNop(),
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       if !client.doHealthCheck(addr) {
+               t.Error("expected healthy server to return true")
+       }
+
+       // unhealthy server
+       unhealthy := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusInternalServerError)
+       }))
+       defer unhealthy.Close()
+       addr2 := unhealthy.Listener.Addr().String()
+       if client.doHealthCheck(addr2) {
+               t.Error("expected unhealthy server to return false")
+       }
+}
+
+func TestHealthCheckThreshold(t *testing.T) {
+       resetInstance()
+       client := &NamingServerClient{
+               config:         testConfig,
+               logger:         zap.NewNop(),
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+
+       addr := "test-addr"
+
+       client.availableNamingMap.Store(addr, int32(0))
+       val, _ := client.availableNamingMap.Load(addr)
+       if val.(int32) != 0 {
+               t.Error("initial fail count should be 0")
+       }
+
+       client.checkAvailableNamingAddr([]string{addr})
+       val, _ = client.availableNamingMap.Load(addr)
+       if val.(int32) != 1 {
+               t.Error("fail count should be 1 after first failure")
+       }
+
+       client.checkAvailableNamingAddr([]string{addr})
+       val, _ = client.availableNamingMap.Load(addr)
+       if val.(int32) != 2 {
+               t.Error("fail count should be 2 after second failure")
+       }
+
+       client.checkAvailableNamingAddr([]string{addr})
+       val, _ = client.availableNamingMap.Load(addr)
+       if val.(int32) != 3 {
+               t.Error("fail count should be 3 after third failure")
+       }
+}
+
+// Test handleMetadata filters nodes correctly
+func TestHandleMetadata(t *testing.T) {
+       client := &NamingServerClient{
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       meta := &MetaResponse{
+               Term: 2,
+               ClusterList: []Cluster{
+                       {
+                               ClusterName: "c1",
+                               ClusterType: "t1",
+                               UnitData: []Unit{
+                                       {
+                                               UnitName: "u1",
+                                               NamingInstanceList: 
[]NamingServerNode{
+                                                       {Role: 
ClusterRoleLeader, Term: 2, Healthy: true},
+                                                       {Role: 
ClusterRoleLeader, Term: 1, Healthy: true},
+                                                       {Role: 
ClusterRoleMember, Term: 1, Healthy: false},
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+       err := client.handleMetadata(meta, "vg1")
+       if err != nil {
+               t.Fatalf("handleMetadata returned error: %v", err)
+       }
+       val, ok := client.vgroupAddressMap.Load("vg1")
+       if !ok {
+               t.Fatal("vgroupAddressMap missing key vg1")
+       }
+       nodes := val.([]NamingServerNode)
+       // expect only leader with term>=2 and all members
+       if len(nodes) != 2 {
+               t.Errorf("expected 2 nodes, got %d", len(nodes))
+       }
+}
+
+// Additional tests for getNamingAddr when availableNamingMap populated
+func TestGetNamingAddr_NoAvailable(t *testing.T) {
+       client := &NamingServerClient{
+               config:         testConfig,
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       // empty map
+       _, err := client.getNamingAddr()
+       if err == nil {
+               t.Error("expected error when no available naming server")
+       }
+}
+
+func TestGetNamingAddr_OneAvailable(t *testing.T) {
+       client := &NamingServerClient{
+               config:         testConfig,
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       client.availableNamingMap.Store("host1", int32(0))
+       addr, err := client.getNamingAddr()
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+       if addr != "host1" {
+               t.Errorf("expected host1, got %s", addr)
+       }
+}
+
+func TestGetNamingAddr_CacheExpiry(t *testing.T) {
+       resetInstance()
+       client := &NamingServerClient{
+               config:         testConfig,
+               httpClient:     &http.Client{Timeout: 3 * time.Second},
+               longPollClient: &http.Client{Timeout: 30 * time.Second},
+       }
+       client.availableNamingMap.Store("host1", int32(0))
+       client.availableNamingMap.Store("host2", int32(0))
+
+       _, err := client.getNamingAddr()
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+
+       client.namingAddrCacheTimestamp = time.Now().UnixMilli() - 31000
+
+       _, err = client.getNamingAddr()
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+
+       if client.namingAddrCacheTimestamp <= time.Now().UnixMilli()-30000 {
+               t.Error("cache timestamp should be updated after expiry")
+       }
+}
+
+func TestLookup_FirstCall(t *testing.T) {
+       // Reset naming server singleton instance
+       resetInstance()
+       // Create mock naming server with necessary endpoints (login, 
discovery, health)
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               t.Logf("Received request: %s %s", r.Method, r.URL.Path)
+
+               switch r.URL.Path {
+               case "/api/v1/auth/login":
+                       // Handle login request, return valid JWT token
+                       w.Header().Set("Content-Type", contentTypeJSON)
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(map[string]string{
+                               "code": "200",
+                               "data": "mock-jwt-token",
+                       })
+
+               case "/naming/v1/discovery":
+                       // Verify auth token
+                       if r.Header.Get(authorizationHeader) != 
"mock-jwt-token" {
+                               t.Errorf("Missing valid token: %s", 
r.Header.Get(authorizationHeader))
+                               w.WriteHeader(http.StatusUnauthorized)
+                               return
+                       }
+
+                       // Construct mock response with 4 nodes (2 valid, 2 
invalid)
+                       resp := MetaResponse{
+                               Term: 1,
+                               ClusterList: []Cluster{{
+                                       ClusterName: "testCluster",
+                                       ClusterType: "SEATA",
+                                       UnitData: []Unit{{
+                                               UnitName: "testUnit",
+                                               NamingInstanceList: 
[]NamingServerNode{
+                                                       {Role: 
ClusterRoleLeader, Term: 1, Healthy: true, Transaction: Endpoint{Host: 
"10.0.0.1", Port: 8091, Protocol: "tcp"}, Version: "1.0.0"},
+                                                       {Role: 
ClusterRoleMember, Term: 1, Healthy: true, Transaction: Endpoint{Host: 
"10.0.0.2", Port: 8092, Protocol: "tcp"}, Version: "1.0.0"},
+                                                       {Healthy: false, 
Transaction: Endpoint{Host: "10.0.0.3", Port: 8093}},
+                                                       {Healthy: true, 
Transaction: Endpoint{Host: "10.0.0.4", Port: 70000}},
+                                               },
+                                       }},
+                               }},
+                       }
+
+                       w.Header().Set("Content-Type", contentTypeJSON)
+                       w.WriteHeader(http.StatusOK)
+                       if err := json.NewEncoder(w).Encode(resp); err != nil {
+                               t.Errorf("Failed to encode response: %v", err)
+                       }
+
+               case "/naming/v1/health":
+                       w.WriteHeader(http.StatusOK)
+
+               default:
+                       t.Errorf("Unknown path: %s", r.URL.Path)
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer mockServer.Close()
+
+       // Configure client to use mock server
+       mockAddr := mockServer.Listener.Addr().String()
+       testConfig.ServerAddr = mockAddr
+       client := GetInstance(testConfig)
+       client.logger = zap.NewNop()
+       client.availableNamingMap.Store(mockAddr, int32(0))
+       client.namingAddrCache = mockAddr
+
+       // Call Lookup and check for errors
+       instances, err := client.Lookup("testGroup")
+       if err != nil {
+               t.Fatalf("Lookup failed: %v", err)
+       }
+
+       // Verify cached data exists
+       val, ok := client.vgroupAddressMap.Load("testGroup")
+       if !ok {
+               t.Fatal("No testGroup data in cache")
+       }
+       nodes := val.([]NamingServerNode)
+       t.Logf("Fetched %d nodes from server", len(nodes))
+       for i, node := range nodes {
+               t.Logf("Node %d: Healthy=%v, Host=%s, Port=%d", i, 
node.Healthy, node.Transaction.Host, node.Transaction.Port)
+       }
+
+       // Verify correct number of valid instances (2 expected)
+       if len(instances) != 2 {
+               t.Fatalf("Expected 2 instances, got %d", len(instances))
+       }
+       expected := map[string]int{"10.0.0.1": 8091, "10.0.0.2": 8092}
+       for _, inst := range instances {
+               if port, ok := expected[inst.Addr]; !ok || inst.Port != port {
+                       t.Errorf("Mismatched instance: %s:%d", inst.Addr, 
inst.Port)
+               }
+       }
+}
+
+func TestRefreshToken_Success(t *testing.T) {
+       // Reset naming server singleton instance
+       resetInstance()
+       // Mock login endpoint returning valid token
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               if r.URL.Path != "/api/v1/auth/login" || r.Method != 
http.MethodPost {
+                       t.Errorf("Invalid request: %s %s", r.Method, r.URL.Path)
+               }
+               // Verify credentials
+               if r.URL.Query().Get("username") != testConfig.Username || 
r.URL.Query().Get("password") != testConfig.Password {
+                       t.Error("Invalid username/password in request")
+               }
+               // Return success response with token
+               w.WriteHeader(http.StatusOK)
+               _ = json.NewEncoder(w).Encode(map[string]string{
+                       "code": "200",
+                       "data": "mock-jwt-token",
+               })
+       }))
+       defer mockServer.Close()
+
+       client := GetInstance(testConfig)
+       client.logger = zap.NewNop()
+       err := client.RefreshToken(mockServer.Listener.Addr().String())
+       if err != nil {
+               t.Fatalf("RefreshToken failed: %v", err)
+       }
+       if client.jwtToken != "mock-jwt-token" {
+               t.Error("jwtToken not updated after login")
+       }
+}
+
+func TestRefreshToken_RetryMechanism(t *testing.T) {
+       resetInstance()
+       attemptCount := 0
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               attemptCount++
+               if attemptCount < 3 {
+                       w.WriteHeader(http.StatusInternalServerError)
+                       return
+               }
+               w.WriteHeader(http.StatusOK)
+               _ = json.NewEncoder(w).Encode(map[string]string{
+                       "code": "200",
+                       "data": "retry-success-token",
+               })
+       }))
+       defer mockServer.Close()
+
+       client := GetInstance(testConfig)
+       client.logger = zap.NewNop()
+       err := client.RefreshToken(mockServer.Listener.Addr().String())
+       if err != nil {
+               t.Fatalf("RefreshToken failed: %v", err)
+       }
+       if attemptCount != 3 {
+               t.Errorf("Expected 3 attempts, got %d", attemptCount)
+       }
+       if client.jwtToken != "retry-success-token" {
+               t.Error("jwtToken not updated after retry success")
+       }
+}
+
+func TestWatch(t *testing.T) {
+       // Reset naming server singleton instance
+       resetInstance()
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               switch r.URL.Path {
+               case "/api/v1/auth/login":
+                       // Handle login request
+                       if r.Method != http.MethodPost {
+                               t.Errorf("login: Expected POST, got %s", 
r.Method)
+                       }
+                       if r.URL.Query().Get("username") != testConfig.Username 
|| r.URL.Query().Get("password") != testConfig.Password {
+                               t.Error("login: Invalid username/password")
+                       }
+                       w.Header().Set("Content-Type", contentTypeJSON)
+                       w.WriteHeader(http.StatusOK)
+                       _ = json.NewEncoder(w).Encode(map[string]string{
+                               "code": "200",
+                               "data": "mock-jwt-token",
+                       })
+
+               case "/naming/v1/watch":
+                       // Verify auth token
+                       if r.Header.Get(authorizationHeader) != 
"mock-jwt-token" {
+                               t.Errorf("watch: Missing valid token, got %s", 
r.Header.Get(authorizationHeader))
+                               w.WriteHeader(http.StatusUnauthorized)
+                               return
+                       }
+                       // Verify request method and parameters
+                       if r.Method != http.MethodPost {
+                               t.Errorf("watch: Expected POST, got %s", 
r.Method)
+                       }
+                       params := r.URL.Query()
+                       if params.Get("vGroup") != "testGroup" {
+                               t.Errorf("watch: Invalid vGroup, got %s", 
params.Get("vGroup"))
+                       }
+                       if params.Get("clientTerm") != "0" {
+                               t.Errorf("watch: Invalid clientTerm, got %s", 
params.Get("clientTerm"))
+                       }
+                       w.WriteHeader(http.StatusOK)
+
+               default:
+                       t.Errorf("Unknown path: %s", r.URL.Path)
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer mockServer.Close()
+
+       // Configure client
+       mockAddr := mockServer.Listener.Addr().String()
+       client := GetInstance(testConfig)
+       client.logger = zap.NewNop()
+       client.availableNamingMap.Store(mockAddr, int32(0))
+       client.namingAddrCache = mockAddr
+
+       // Call Watch and verify result
+       changed, err := client.Watch("testGroup")
+       if err != nil {
+               t.Fatalf("Watch failed: %v", err)
+       }
+       if !changed {
+               t.Error("Expected change detected (changed=true), got false")
+       }
+}
+
+// Test Watch handling non-200 responses
+func TestWatch_ErrorResponse(t *testing.T) {
+       // Reset naming server singleton instance
+       resetInstance()
+       // Mock server returning error status
+       mockServer := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusInternalServerError)
+       }))
+       defer mockServer.Close()
+
+       client := GetInstance(testConfig)
+       client.logger = zap.NewNop()
+       mockAddr := mockServer.Listener.Addr().String()
+       client.availableNamingMap.Store(mockAddr, int32(0))
+       client.namingAddrCache = mockAddr
+
+       // Verify Watch returns error for non-200 status
+       _, err := client.Watch("testGroup")
+       if err == nil {
+               t.Error("Expected error from Watch, got nil")
+       }
+}
+
+func TestNamingServerConfig_LoadFromYAML(t *testing.T) {
+       // Prepare test YAML content with custom configurations
+       yamlContent := `
+naming-server:
+  cluster: "test-cluster"
+  server-addr: "192.168.1.100:8888"
+  namespace: "test-ns"
+  heartbeat-period: 3000
+  metadata-max-age-ms: 60000
+  username: "test-user"
+  password: "test-pass"
+  token-validity-in-milliseconds: 3600000
+`
+
+       // Create temporary YAML file
+       tmpFile, err := os.CreateTemp("", "naming-server-test-*.yaml")
+       if err != nil {
+               t.Fatalf("Failed to create temp file: %v", err)
+       }
+       defer os.Remove(tmpFile.Name())
+
+       if _, err := tmpFile.WriteString(yamlContent); err != nil {
+               t.Fatalf("Failed to write YAML: %v", err)
+       }
+       tmpFile.Close()
+
+       // Parse YAML into config structure
+       var registryConfig struct {
+               NamingServer NamingServerConfig `yaml:"naming-server"`
+       }
+       data, err := os.ReadFile(tmpFile.Name())
+       if err != nil {
+               t.Fatalf("Failed to read temp file: %v", err)
+       }
+       if err := yaml.Unmarshal(data, &registryConfig); err != nil {
+               t.Fatalf("Failed to unmarshal YAML: %v", err)
+       }
+
+       // Verify parsed values match YAML content
+       nsConfig := registryConfig.NamingServer
+       if nsConfig.Cluster != "test-cluster" {
+               t.Errorf("Cluster: expected 'test-cluster', got %s", 
nsConfig.Cluster)
+       }
+       if nsConfig.ServerAddr != "192.168.1.100:8888" {
+               t.Errorf("ServerAddr: expected '192.168.1.100:8888', got %s", 
nsConfig.ServerAddr)
+       }
+       if nsConfig.Namespace != "test-ns" {
+               t.Errorf("Namespace: expected 'test-ns', got %s", 
nsConfig.Namespace)
+       }
+       if nsConfig.HeartbeatPeriod != 3000 {
+               t.Errorf("HeartbeatPeriod: expected 3000, got %d", 
nsConfig.HeartbeatPeriod)
+       }
+       if nsConfig.Username != "test-user" {
+               t.Errorf("Username: expected 'test-user', got %s", 
nsConfig.Username)
+       }
+       if nsConfig.Password != "test-pass" {
+               t.Errorf("Password: expected 'test-pass', got %s", 
nsConfig.Password)
+       }
+       if nsConfig.TokenValidityInMilliseconds != 3600000 {
+               t.Errorf("TokenValidity: expected 3600000, got %d", 
nsConfig.TokenValidityInMilliseconds)
+       }
+}
+
+func TestNamingServerConfig_DefaultValues(t *testing.T) {
+       // Initialize default values using flag registration
+       var nsConfig NamingServerConfig
+       fs := flag.NewFlagSet("test", flag.ContinueOnError)
+       nsConfig.RegisterFlagsWithPrefix("naming-server", fs)
+       if err := fs.Parse([]string{}); err != nil {
+               t.Fatalf("Failed to parse flags: %v", err)
+       }
+
+       // Parse empty YAML configuration
+       yamlContent := `naming-server: {}`
+       tmpFile, err := os.CreateTemp("", "naming-server-default-*.yaml")
+       if err != nil {
+               t.Fatalf("Failed to create temp file: %v", err)
+       }
+       defer os.Remove(tmpFile.Name())
+       if _, err := tmpFile.WriteString(yamlContent); err != nil {
+               t.Fatalf("Failed to write YAML: %v", err)
+       }
+       tmpFile.Close()
+
+       // Load empty config from YAML
+       var tmpConfig struct {
+               NamingServer NamingServerConfig `yaml:"naming-server"`
+       }
+       data, _ := os.ReadFile(tmpFile.Name())
+       if err := yaml.Unmarshal(data, &tmpConfig); err != nil {
+               t.Fatalf("Failed to unmarshal YAML: %v", err)
+       }
+
+       // Merge YAML values with defaults (YAML takes precedence)
+       if tmpConfig.NamingServer.ServerAddr != "" {
+               nsConfig.ServerAddr = tmpConfig.NamingServer.ServerAddr
+       }
+       if tmpConfig.NamingServer.Namespace != "" {
+               nsConfig.Namespace = tmpConfig.NamingServer.Namespace
+       }
+       if tmpConfig.NamingServer.HeartbeatPeriod != 0 {
+               nsConfig.HeartbeatPeriod = 
tmpConfig.NamingServer.HeartbeatPeriod
+       }
+       if tmpConfig.NamingServer.MetadataMaxAgeMs != 0 {
+               nsConfig.MetadataMaxAgeMs = 
tmpConfig.NamingServer.MetadataMaxAgeMs
+       }
+       if tmpConfig.NamingServer.Username != "" {
+               nsConfig.Username = tmpConfig.NamingServer.Username
+       }
+       if tmpConfig.NamingServer.TokenValidityInMilliseconds != 0 {
+               nsConfig.TokenValidityInMilliseconds = 
tmpConfig.NamingServer.TokenValidityInMilliseconds
+       }
+
+       // Verify default values match expected
+       if nsConfig.ServerAddr != "127.0.0.1:8081" {
+               t.Errorf("ServerAddr default: expected '127.0.0.1:8081', got 
%s", nsConfig.ServerAddr)
+       }
+       if nsConfig.Namespace != "public" {
+               t.Errorf("Namespace default: expected 'public', got %s", 
nsConfig.Namespace)
+       }
+       if nsConfig.HeartbeatPeriod != 5000 {
+               t.Errorf("HeartbeatPeriod default: expected 5000, got %d", 
nsConfig.HeartbeatPeriod)
+       }
+       if nsConfig.MetadataMaxAgeMs != 30000 {
+               t.Errorf("MetadataMaxAgeMs default: expected 30000, got %d", 
nsConfig.MetadataMaxAgeMs)
+       }
+       if nsConfig.Username != "" {
+               t.Errorf("Username default: expected empty, got %s", 
nsConfig.Username)
+       }
+       if nsConfig.TokenValidityInMilliseconds != 29*60*1000 {
+               t.Errorf("TokenValidity default: expected 1740000, got %d", 
nsConfig.TokenValidityInMilliseconds)
+       }
+}
+
+func TestInitRegistry_WithNamingServerConfig(t *testing.T) {
+       // Reset global registry instance
+       registryServiceInstance = nil
+       // Reset naming server singleton instance
+       resetInstance()
+
+       // Create custom configuration
+       customConfig := &RegistryConfig{
+               Type: NAMINGSERVER,
+               NamingServer: NamingServerConfig{
+                       ServerAddr:                  "custom-server:8080",
+                       Username:                    "init-test-user",
+                       Password:                    "init-test-pass",
+                       HeartbeatPeriod:             2000,
+                       TokenValidityInMilliseconds: 600000,
+               },
+       }
+
+       // Initialize registry with custom config
+       InitRegistry(nil, customConfig)
+
+       // Verify registry initialization
+       registry := GetRegistry()
+       if registry == nil {
+               t.Fatal("InitRegistry failed: registry is nil")
+       }
+
+       // Check registry type
+       namingRegistry, ok := registry.(*NamingServerRegistryService)
+       if !ok {
+               t.Fatalf("Expected NamingServerRegistryService, got %T", 
registry)
+       }
+
+       // Verify client uses custom config
+       client := namingRegistry.client
+       if client.config.ServerAddr != "custom-server:8080" {
+               t.Errorf("Client ServerAddr: expected 'custom-server:8080', got 
%s", client.config.ServerAddr)
+       }
+       if client.config.Username != "init-test-user" {
+               t.Errorf("Client Username: expected 'init-test-user', got %s", 
client.config.Username)
+       }
+       if client.config.Password != "init-test-pass" {
+               t.Errorf("Client Password: expected 'init-test-pass', got %s", 
client.config.Password)
+       }
+       if client.config.HeartbeatPeriod != 2000 {
+               t.Errorf("Client HeartbeatPeriod: expected 2000, got %d", 
client.config.HeartbeatPeriod)
+       }
+       if client.config.TokenValidityInMilliseconds != 600000 {
+               t.Errorf("Client TokenValidity: expected 600000, got %d", 
client.config.TokenValidityInMilliseconds)
+       }
+}
diff --git a/pkg/discovery/base.go b/pkg/util/rand/rand.go
similarity index 68%
copy from pkg/discovery/base.go
copy to pkg/util/rand/rand.go
index fcc0d016..2fa771a4 100644
--- a/pkg/discovery/base.go
+++ b/pkg/util/rand/rand.go
@@ -15,25 +15,24 @@
  * limitations under the License.
  */
 
-package discovery
+package rand
 
-const (
-       FILE   string = "file"
-       NACOS  string = "nacos"
-       ETCD   string = "etcd"
-       EUREKA string = "eureka"
-       REDIS  string = "redis"
-       ZK     string = "zk"
-       CONSUL string = "consul"
-       SOFA   string = "sofa"
+import (
+       "math/rand"
+       "sync"
+       "time"
 )
 
-type ServiceInstance struct {
-       Addr string
-       Port int
-}
+var (
+       randGen = rand.New(rand.NewSource(time.Now().UnixNano()))
+       mu      sync.Mutex
+)
 
-type RegistryService interface {
-       Lookup(key string) ([]*ServiceInstance, error)
-       Close()
+func RandIntn(n int) int {
+       if n <= 0 {
+               return 0
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       return randGen.Intn(n)
 }
diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml
index fa8a7f82..d52b8456 100644
--- a/testdata/conf/seatago.yml
+++ b/testdata/conf/seatago.yml
@@ -120,7 +120,7 @@ seata:
       data-id: seata.properties
   # Registration Center
   registry:
-    type: file
+    type: naming-server
     file:
       name: seatago.yml
     nacos:
@@ -136,6 +136,15 @@ seata:
     etcd3:
       cluster: "default"
       server-addr: "http://localhost:2379";
+    naming-server:
+      cluster: "default"
+      server-addr: "127.0.0.1:8081"
+      namespace: "public"
+      heartbeat-period: 5000
+      metadata-max-age-ms: 30000
+      username: ""
+      password: ""
+      token-validity-in-milliseconds: 1740000
   log:
     exception-rate: 100
   tcc:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to