This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a880f51 Support file base node discovery (#928)
8a880f51 is described below

commit 8a880f51df7a9f843803b84ca8b6e9ff9d2acfca
Author: mrproliu <[email protected]>
AuthorDate: Mon Jan 12 17:01:55 2026 +0800

    Support file base node discovery (#928)
    
    * Support file base node discovery
    
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/metadata/client.go                         | 135 ++++-
 banyand/metadata/discovery/common/base.go          |  45 ++
 banyand/metadata/discovery/common/cache.go         | 216 ++++++++
 banyand/metadata/discovery/common/grpc.go          |  61 ++
 banyand/metadata/discovery/common/retry.go         | 284 ++++++++++
 banyand/metadata/{ => discovery}/dns/dns.go        | 325 +++++------
 banyand/metadata/{ => discovery}/dns/dns_test.go   |   7 +-
 .../metadata/{ => discovery}/dns/export_test.go    |  12 +-
 banyand/metadata/{ => discovery}/dns/metrics.go    |  32 ++
 .../{ => discovery}/dns/testdata/ca_cert.pem       |   0
 .../{ => discovery}/dns/testdata/server_key.pem    |   0
 banyand/metadata/discovery/file/file.go            | 466 ++++++++++++++++
 banyand/metadata/discovery/file/file_test.go       | 616 +++++++++++++++++++++
 banyand/metadata/discovery/file/metrics.go         |  70 +++
 docs/operation/node-discovery.md                   | 128 ++++-
 15 files changed, 2190 insertions(+), 207 deletions(-)

diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 0ee2b102..ae5a9d30 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -33,7 +33,8 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/dns"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/dns"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/file"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -52,6 +53,8 @@ const (
        NodeDiscoveryModeEtcd = "etcd"
        // NodeDiscoveryModeDNS represents DNS-based node discovery mode.
        NodeDiscoveryModeDNS = "dns"
+       // NodeDiscoveryModeFile represents file-based node discovery mode.
+       NodeDiscoveryModeFile = "file"
 )
 
 const flagEtcdUsername = "etcd-username"
@@ -74,30 +77,36 @@ func NewClient(toRegisterNode, forceRegisterNode bool) 
(Service, error) {
 }
 
 type clientService struct {
-       schemaRegistry       schema.Registry
-       dnsDiscovery         *dns.Service
-       closer               *run.Closer
-       nodeInfo             *databasev1.Node
-       etcdTLSCertFile      string
-       dnsCACertPaths       []string
-       etcdPassword         string
-       etcdTLSCAFile        string
-       etcdUsername         string
-       etcdTLSKeyFile       string
-       namespace            string
-       nodeDiscoveryMode    string
-       dnsSRVAddresses      []string
-       endpoints            []string
-       registryTimeout      time.Duration
-       dnsFetchInitInterval time.Duration
-       dnsFetchInitDuration time.Duration
-       dnsFetchInterval     time.Duration
-       grpcTimeout          time.Duration
-       etcdFullSyncInterval time.Duration
-       nodeInfoMux          sync.Mutex
-       forceRegisterNode    bool
-       toRegisterNode       bool
-       dnsTLSEnabled        bool
+       schemaRegistry           schema.Registry
+       dnsDiscovery             *dns.Service
+       fileDiscovery            *file.Service
+       closer                   *run.Closer
+       nodeInfo                 *databasev1.Node
+       etcdTLSCertFile          string
+       dnsCACertPaths           []string
+       etcdPassword             string
+       etcdTLSCAFile            string
+       etcdUsername             string
+       etcdTLSKeyFile           string
+       namespace                string
+       nodeDiscoveryMode        string
+       filePath                 string
+       dnsSRVAddresses          []string
+       endpoints                []string
+       registryTimeout          time.Duration
+       dnsFetchInitInterval     time.Duration
+       dnsFetchInitDuration     time.Duration
+       dnsFetchInterval         time.Duration
+       grpcTimeout              time.Duration
+       etcdFullSyncInterval     time.Duration
+       fileFetchInterval        time.Duration
+       fileRetryInitialInterval time.Duration
+       fileRetryMaxInterval     time.Duration
+       fileRetryMultiplier      float64
+       nodeInfoMux              sync.Mutex
+       forceRegisterNode        bool
+       toRegisterNode           bool
+       dnsTLSEnabled            bool
 }
 
 func (s *clientService) SchemaRegistry() schema.Registry {
@@ -118,7 +127,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
 
        // node discovery configuration
        fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode", 
NodeDiscoveryModeEtcd,
-               "Node discovery mode: 'etcd' for etcd-based discovery, 'dns' 
for DNS-based discovery")
+               "Node discovery mode: 'etcd' for etcd-based discovery, 'dns' 
for DNS-based discovery, 'file' for file-based discovery")
        fs.StringSliceVar(&s.dnsSRVAddresses, 
"node-discovery-dns-srv-addresses", []string{},
                "DNS SRV addresses for node discovery (e.g., 
_grpc._tcp.banyandb.svc.cluster.local)")
        fs.DurationVar(&s.dnsFetchInitInterval, 
"node-discovery-dns-fetch-init-interval", 5*time.Second,
@@ -133,13 +142,25 @@ func (s *clientService) FlagSet() *run.FlagSet {
                "Enable TLS for DNS discovery gRPC connections")
        fs.StringSliceVar(&s.dnsCACertPaths, "node-discovery-dns-ca-certs", 
[]string{},
                "Comma-separated list of CA certificate files to verify DNS 
discovered nodes (one per SRV address, in same order)")
+       fs.StringVar(&s.filePath, "node-discovery-file-path", "",
+               "File path for static node configuration (file mode only)")
+       fs.DurationVar(&s.fileFetchInterval, 
"node-discovery-file-fetch-interval", 5*time.Minute,
+               "Interval to poll the discovery file in file discovery mode 
(fallback mechanism)")
+       fs.DurationVar(&s.fileRetryInitialInterval, 
"node-discovery-file-retry-initial-interval", 1*time.Second,
+               "Initial retry interval for failed node metadata fetches in 
file discovery mode")
+       fs.DurationVar(&s.fileRetryMaxInterval, 
"node-discovery-file-retry-max-interval", 2*time.Minute,
+               "Maximum retry interval for failed node metadata fetches in 
file discovery mode")
+       fs.Float64Var(&s.fileRetryMultiplier, 
"node-discovery-file-retry-multiplier", 2.0,
+               "Backoff multiplier for retry intervals in file discovery mode")
 
        return fs
 }
 
 func (s *clientService) Validate() error {
-       if s.nodeDiscoveryMode != NodeDiscoveryModeEtcd && s.nodeDiscoveryMode 
!= NodeDiscoveryModeDNS {
-               return fmt.Errorf("invalid node-discovery-mode: %s, must be 
'%s' or '%s'", s.nodeDiscoveryMode, NodeDiscoveryModeEtcd, NodeDiscoveryModeDNS)
+       if s.nodeDiscoveryMode != NodeDiscoveryModeEtcd && s.nodeDiscoveryMode 
!= NodeDiscoveryModeDNS &&
+               s.nodeDiscoveryMode != NodeDiscoveryModeFile {
+               return fmt.Errorf("invalid node-discovery-mode: %s, must be 
'%s', '%s', or '%s'",
+                       s.nodeDiscoveryMode, NodeDiscoveryModeEtcd, 
NodeDiscoveryModeDNS, NodeDiscoveryModeFile)
        }
 
        // Validate etcd endpoints (always required for schema storage, 
regardless of node discovery mode)
@@ -163,6 +184,16 @@ func (s *clientService) Validate() error {
                }
        }
 
+       // Validate file mode specific requirements
+       if s.nodeDiscoveryMode == NodeDiscoveryModeFile {
+               if s.filePath == "" {
+                       return errors.New("file mode requires non-empty file 
path")
+               }
+               if _, err := os.Stat(s.filePath); err != nil {
+                       return fmt.Errorf("file path validation failed: %w", 
err)
+               }
+       }
+
        return nil
 }
 
@@ -230,8 +261,26 @@ func (s *clientService) PreRun(ctx context.Context) error {
                }
        }
 
-       // skip node registration if DNS mode is enabled or node registration 
is disabled
-       if !s.toRegisterNode || s.nodeDiscoveryMode == NodeDiscoveryModeDNS {
+       if s.nodeDiscoveryMode == NodeDiscoveryModeFile {
+               l.Info().Str("file-path", s.filePath).Msg("Initializing 
file-based node discovery")
+
+               var createErr error
+               s.fileDiscovery, createErr = file.NewService(file.Config{
+                       FilePath:             s.filePath,
+                       GRPCTimeout:          s.grpcTimeout,
+                       FetchInterval:        s.fileFetchInterval,
+                       RetryInitialInterval: s.fileRetryInitialInterval,
+                       RetryMaxInterval:     s.fileRetryMaxInterval,
+                       RetryMultiplier:      s.fileRetryMultiplier,
+               })
+               if createErr != nil {
+                       return fmt.Errorf("failed to create file discovery 
service: %w", createErr)
+               }
+       }
+
+       // skip node registration if DNS/file mode is enabled or node 
registration is disabled
+       if !s.toRegisterNode || s.nodeDiscoveryMode == NodeDiscoveryModeDNS ||
+               s.nodeDiscoveryMode == NodeDiscoveryModeFile {
                return nil
        }
        val := ctx.Value(common.ContextNodeKey)
@@ -299,6 +348,13 @@ func (s *clientService) Serve() run.StopNotify {
                }
        }
 
+       // Start file discovery
+       if s.fileDiscovery != nil {
+               if err := s.fileDiscovery.Start(s.closer.Ctx()); err != nil {
+                       logger.GetLogger(s.Name()).Error().Err(err).Msg("failed 
to start file discovery")
+               }
+       }
+
        return s.closer.CloseNotify()
 }
 
@@ -312,6 +368,12 @@ func (s *clientService) GracefulStop() {
                }
        }
 
+       if s.fileDiscovery != nil {
+               if err := s.fileDiscovery.Close(); err != nil {
+                       logger.GetLogger(s.Name()).Error().Err(err).Msg("failed 
to close file discovery")
+               }
+       }
+
        if s.schemaRegistry != nil {
                if err := s.schemaRegistry.Close(); err != nil {
                        logger.GetLogger(s.Name()).Error().Err(err).Msg("failed 
to close schema registry")
@@ -320,6 +382,10 @@ func (s *clientService) GracefulStop() {
 }
 
 func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler 
schema.EventHandler) {
+       if kind == schema.KindNode && s.fileDiscovery != nil {
+               s.fileDiscovery.RegisterHandler(name, handler)
+               return
+       }
        if kind == schema.KindNode && s.dnsDiscovery != nil {
                s.dnsDiscovery.RegisterHandler(name, handler)
                return
@@ -356,6 +422,10 @@ func (s *clientService) TopNAggregationRegistry() 
schema.TopNAggregation {
 }
 
 func (s *clientService) NodeRegistry() schema.Node {
+       // If file discovery is enabled, use it instead of etcd
+       if s.fileDiscovery != nil {
+               return s.fileDiscovery
+       }
        // If DNS discovery is enabled, use it instead of etcd
        if s.dnsDiscovery != nil {
                return s.dnsDiscovery
@@ -374,6 +444,11 @@ func (s *clientService) SetMetricsRegistry(omr 
observability.MetricsRegistry) {
                factory := 
observability.RootScope.SubScope("metadata").SubScope("dns_discovery")
                s.dnsDiscovery.SetMetrics(omr.With(factory))
        }
+       // initialize file discovery with metrics if it exists
+       if s.fileDiscovery != nil {
+               factory := 
observability.RootScope.SubScope("metadata").SubScope("file_discovery")
+               s.fileDiscovery.SetMetrics(omr.With(factory))
+       }
 }
 
 func (s *clientService) Name() string {
diff --git a/banyand/metadata/discovery/common/base.go 
b/banyand/metadata/discovery/common/base.go
new file mode 100644
index 00000000..9ddfe95f
--- /dev/null
+++ b/banyand/metadata/discovery/common/base.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+// DiscoveryServiceBase combines NodeCacheBase and RetryManager for discovery 
services.
+type DiscoveryServiceBase struct {
+       *NodeCacheBase
+       *RetryManager
+}
+
+// NewServiceBase create a new base service for gRPC-based and file-based 
service.
+func NewServiceBase(
+       loggerName string,
+       fetcher NodeFetcher,
+       retryConfig RetryConfig,
+) *DiscoveryServiceBase {
+       cacheBase := NewNodeCacheBase(loggerName)
+       retryManager := NewRetryManager(fetcher, cacheBase, retryConfig, nil)
+       return &DiscoveryServiceBase{
+               NodeCacheBase: cacheBase,
+               RetryManager:  retryManager,
+       }
+}
+
+// SetMetrics sets metrics for the retry manager.
+func (b *DiscoveryServiceBase) SetMetrics(metrics RetryMetrics) {
+       if b.RetryManager != nil {
+               b.RetryManager.SetMetrics(metrics)
+       }
+}
diff --git a/banyand/metadata/discovery/common/cache.go 
b/banyand/metadata/discovery/common/cache.go
new file mode 100644
index 00000000..2dfc268c
--- /dev/null
+++ b/banyand/metadata/discovery/common/cache.go
@@ -0,0 +1,216 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package common provides shared functionality for node discovery 
implementations.
+package common
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// NodeCacheBase provides common node cache management functionality for 
discovery services.
+type NodeCacheBase struct {
+       nodeCache     map[string]*databasev1.Node
+       log           *logger.Logger
+       handlers      []schema.EventHandler
+       cacheMutex    sync.RWMutex
+       handlersMutex sync.RWMutex
+}
+
+// NewNodeCacheBase creates a new base with initialized maps.
+func NewNodeCacheBase(loggerName string) *NodeCacheBase {
+       return &NodeCacheBase{
+               nodeCache: make(map[string]*databasev1.Node),
+               handlers:  make([]schema.EventHandler, 0),
+               log:       logger.GetLogger(loggerName),
+       }
+}
+
+// GetLogger returns the logger.
+func (b *NodeCacheBase) GetLogger() *logger.Logger {
+       return b.log
+}
+
+// ListNode lists all nodes from cache, filtering by role.
+func (b *NodeCacheBase) ListNode(_ context.Context, role databasev1.Role) 
([]*databasev1.Node, error) {
+       b.cacheMutex.RLock()
+       defer b.cacheMutex.RUnlock()
+
+       var result []*databasev1.Node
+       for _, node := range b.nodeCache {
+               if role != databasev1.Role_ROLE_UNSPECIFIED {
+                       hasRole := false
+                       for _, nodeRole := range node.GetRoles() {
+                               if nodeRole == role {
+                                       hasRole = true
+                                       break
+                               }
+                       }
+                       if !hasRole {
+                               continue
+                       }
+               }
+               result = append(result, node)
+       }
+       return result, nil
+}
+
+// GetNode retrieves a specific node by name.
+func (b *NodeCacheBase) GetNode(_ context.Context, nodeName string) 
(*databasev1.Node, error) {
+       b.cacheMutex.RLock()
+       defer b.cacheMutex.RUnlock()
+
+       for _, node := range b.nodeCache {
+               if node.GetMetadata() != nil && node.GetMetadata().GetName() == 
nodeName {
+                       return node, nil
+               }
+       }
+       return nil, fmt.Errorf("node %s not found", nodeName)
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (b *NodeCacheBase) RegisterHandler(name string, handler 
schema.EventHandler) {
+       b.handlersMutex.Lock()
+       defer b.handlersMutex.Unlock()
+
+       b.handlers = append(b.handlers, handler)
+       b.log.Debug().Str("handler", name).Msg("Registered node discovery 
handler")
+}
+
+// NotifyHandlers notifies all registered handlers of node changes.
+func (b *NodeCacheBase) NotifyHandlers(metadata schema.Metadata, isAddOrUpdate 
bool) {
+       b.handlersMutex.RLock()
+       defer b.handlersMutex.RUnlock()
+
+       for _, handler := range b.handlers {
+               if isAddOrUpdate {
+                       handler.OnAddOrUpdate(metadata)
+               } else {
+                       handler.OnDelete(metadata)
+               }
+       }
+}
+
+// AddNode adds a node to the cache.
+// Returns true if the node was added, false if it already existed.
+func (b *NodeCacheBase) AddNode(address string, node *databasev1.Node) bool {
+       b.cacheMutex.Lock()
+       defer b.cacheMutex.Unlock()
+
+       if _, exists := b.nodeCache[address]; exists {
+               return false
+       }
+       b.nodeCache[address] = node
+       return true
+}
+
+// GetCachedNode retrieves a node from cache by address.
+// Returns the node and true if found, nil and false otherwise.
+func (b *NodeCacheBase) GetCachedNode(address string) (*databasev1.Node, bool) 
{
+       b.cacheMutex.RLock()
+       defer b.cacheMutex.RUnlock()
+       node, exists := b.nodeCache[address]
+       return node, exists
+}
+
+// GetCacheSize returns the current number of nodes in the cache.
+func (b *NodeCacheBase) GetCacheSize() int {
+       b.cacheMutex.RLock()
+       defer b.cacheMutex.RUnlock()
+       return len(b.nodeCache)
+}
+
+// GetAllNodeAddresses returns all addresses currently in the cache.
+func (b *NodeCacheBase) GetAllNodeAddresses() []string {
+       b.cacheMutex.RLock()
+       defer b.cacheMutex.RUnlock()
+
+       addresses := make([]string, 0, len(b.nodeCache))
+       for addr := range b.nodeCache {
+               addresses = append(addresses, addr)
+       }
+       return addresses
+}
+
+// RemoveNodes removes multiple nodes from the cache.
+// Returns a map of removed nodes keyed by address.
+func (b *NodeCacheBase) RemoveNodes(addresses []string) 
map[string]*databasev1.Node {
+       b.cacheMutex.Lock()
+       defer b.cacheMutex.Unlock()
+
+       removed := make(map[string]*databasev1.Node)
+       for _, addr := range addresses {
+               if node, exists := b.nodeCache[addr]; exists {
+                       delete(b.nodeCache, addr)
+                       removed[addr] = node
+               }
+       }
+       return removed
+}
+
+// AddNodeAndNotify adds a node to the cache and notifies handlers if added 
successfully.
+// Returns true if the node was added, false if it already existed.
+func (b *NodeCacheBase) AddNodeAndNotify(address string, node 
*databasev1.Node, logMsg string) bool {
+       if added := b.AddNode(address, node); added {
+               // notify handlers
+               b.NotifyHandlers(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                               Name: node.GetMetadata().GetName(),
+                       },
+                       Spec: node,
+               }, true)
+
+               b.log.Debug().
+                       Str("address", address).
+                       Str("name", node.GetMetadata().GetName()).
+                       Msg(logMsg)
+
+               return true
+       }
+       return false
+}
+
+// RemoveNodesAndNotify removes nodes and notifies handlers for each removed 
node.
+// Returns a map of removed nodes keyed by address.
+func (b *NodeCacheBase) RemoveNodesAndNotify(addresses []string, logMsg 
string) map[string]*databasev1.Node {
+       removed := b.RemoveNodes(addresses)
+
+       // notify handlers for deletions
+       for addr, node := range removed {
+               b.NotifyHandlers(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                               Name: node.GetMetadata().GetName(),
+                       },
+                       Spec: node,
+               }, false)
+
+               b.log.Debug().
+                       Str("address", addr).
+                       Str("name", node.GetMetadata().GetName()).
+                       Msg(logMsg)
+       }
+
+       return removed
+}
diff --git a/banyand/metadata/discovery/common/grpc.go 
b/banyand/metadata/discovery/common/grpc.go
new file mode 100644
index 00000000..198950ce
--- /dev/null
+++ b/banyand/metadata/discovery/common/grpc.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+)
+
+// GRPCDialOptionsProvider provides gRPC dial options for TLS configuration.
+type GRPCDialOptionsProvider interface {
+       GetDialOptions(address string) ([]grpc.DialOption, error)
+}
+
+// FetchNodeMetadata fetches node metadata via gRPC.
+// This is the common implementation used by both DNS and file discovery.
+func FetchNodeMetadata(ctx context.Context, address string, timeout 
time.Duration, dialOptsProvider GRPCDialOptionsProvider) (*databasev1.Node, 
error) {
+       ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
+       defer cancel()
+
+       dialOpts, err := dialOptsProvider.GetDialOptions(address)
+       if err != nil {
+               return nil, fmt.Errorf("failed to get dial options for %s: %w", 
address, err)
+       }
+
+       // nolint:contextcheck
+       conn, connErr := grpchelper.Conn(address, timeout, dialOpts...)
+       if connErr != nil {
+               return nil, fmt.Errorf("failed to connect to %s: %w", address, 
connErr)
+       }
+       defer conn.Close()
+
+       client := databasev1.NewNodeQueryServiceClient(conn)
+       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
+       if callErr != nil {
+               return nil, fmt.Errorf("failed to get current node from %s: 
%w", address, callErr)
+       }
+
+       return resp.GetNode(), nil
+}
diff --git a/banyand/metadata/discovery/common/retry.go 
b/banyand/metadata/discovery/common/retry.go
new file mode 100644
index 00000000..b419c245
--- /dev/null
+++ b/banyand/metadata/discovery/common/retry.go
@@ -0,0 +1,284 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// NodeFetcher defines the interface for fetching node metadata.
+type NodeFetcher interface {
+       // FetchNodeWithRetry attempts to fetch node metadata for the given 
address.
+       FetchNodeWithRetry(ctx context.Context, address string) 
(*databasev1.Node, error)
+}
+
+// RetryMetrics defines metrics interface for retry operations.
+type RetryMetrics interface {
+       IncRetryCount()
+       IncRetrySuccess()
+       IncRetryFailed()
+       SetQueueSize(size float64)
+}
+
+// RetryConfig holds retry backoff configuration.
+type RetryConfig struct {
+       InitialInterval time.Duration
+       MaxInterval     time.Duration
+       Multiplier      float64
+}
+
+// RetryState tracks backoff retry state for a single node.
+type RetryState struct {
+       nextRetryTime  time.Time
+       lastError      error
+       address        string
+       attemptCount   int
+       currentBackoff time.Duration
+}
+
+// RetryManager manages retry queue and scheduling for failed node fetches.
+type RetryManager struct {
+       fetcher       NodeFetcher
+       metrics       RetryMetrics
+       cacheBase     *NodeCacheBase
+       retryQueue    map[string]*RetryState
+       log           *logger.Logger
+       onSuccess     func(address string, node *databasev1.Node)
+       onAddedToNode func(address string)
+       config        RetryConfig
+       retryMutex    sync.RWMutex
+}
+
+// NewRetryManager creates a new retry manager.
+func NewRetryManager(
+       fetcher NodeFetcher,
+       cacheBase *NodeCacheBase,
+       config RetryConfig,
+       metrics RetryMetrics,
+) *RetryManager {
+       return &RetryManager{
+               fetcher:    fetcher,
+               cacheBase:  cacheBase,
+               retryQueue: make(map[string]*RetryState),
+               config:     config,
+               metrics:    metrics,
+               log:        cacheBase.GetLogger(),
+       }
+}
+
+// SetSuccessCallback sets the callback to be called when a node is 
successfully fetched after retry.
+func (r *RetryManager) SetSuccessCallback(callback func(address string, node 
*databasev1.Node)) {
+       r.onSuccess = callback
+}
+
+// SetAddedToNodeCallback sets the callback to be called when a node is added 
to cache.
+func (r *RetryManager) SetAddedToNodeCallback(callback func(address string)) {
+       r.onAddedToNode = callback
+}
+
+// SetMetrics sets the metrics for the retry manager.
+func (r *RetryManager) SetMetrics(metrics RetryMetrics) {
+       r.metrics = metrics
+}
+
+// AddToRetry adds a failed node fetch to the retry queue.
+func (r *RetryManager) AddToRetry(address string, fetchErr error) {
+       r.retryMutex.Lock()
+       defer r.retryMutex.Unlock()
+
+       r.retryQueue[address] = &RetryState{
+               address:        address,
+               attemptCount:   1,
+               nextRetryTime:  time.Now().Add(r.config.InitialInterval),
+               currentBackoff: r.config.InitialInterval,
+               lastError:      fetchErr,
+       }
+
+       if r.metrics != nil {
+               r.metrics.IncRetryCount()
+               r.metrics.SetQueueSize(float64(len(r.retryQueue)))
+       }
+
+       r.log.Warn().
+               Err(fetchErr).
+               Str("address", address).
+               Msg("Failed to fetch node metadata, adding to retry queue")
+}
+
+// RemoveFromRetry removes an address from the retry queue.
+func (r *RetryManager) RemoveFromRetry(address string) {
+       r.retryMutex.Lock()
+       defer r.retryMutex.Unlock()
+
+       delete(r.retryQueue, address)
+       if r.metrics != nil {
+               r.metrics.SetQueueSize(float64(len(r.retryQueue)))
+       }
+
+       r.log.Debug().
+               Str("address", address).
+               Msg("Removed node from retry queue")
+}
+
+// IsInRetry checks if an address is in the retry queue.
+func (r *RetryManager) IsInRetry(address string) bool {
+       r.retryMutex.RLock()
+       defer r.retryMutex.RUnlock()
+       _, exists := r.retryQueue[address]
+       return exists
+}
+
+// GetQueueSize returns the current retry queue size.
+func (r *RetryManager) GetQueueSize() int {
+       r.retryMutex.RLock()
+       defer r.retryMutex.RUnlock()
+       return len(r.retryQueue)
+}
+
+// ProcessRetryQueue processes nodes that are ready for retry.
+func (r *RetryManager) ProcessRetryQueue(ctx context.Context) {
+       now := time.Now()
+
+       r.retryMutex.RLock()
+       nodesToRetry := make([]*RetryState, 0)
+       for _, retryState := range r.retryQueue {
+               if retryState.nextRetryTime.Before(now) || 
retryState.nextRetryTime.Equal(now) {
+                       nodesToRetry = append(nodesToRetry, retryState)
+               }
+       }
+       r.retryMutex.RUnlock()
+
+       if len(nodesToRetry) == 0 {
+               return
+       }
+
+       r.log.Debug().Int("count", len(nodesToRetry)).Msg("Processing retry 
queue")
+
+       for _, retryState := range nodesToRetry {
+               r.attemptNodeRetry(ctx, retryState)
+       }
+}
+
+func (r *RetryManager) attemptNodeRetry(ctx context.Context, retryState 
*RetryState) {
+       addr := retryState.address
+
+       r.log.Debug().
+               Str("address", addr).
+               Int("attempt", retryState.attemptCount).
+               Dur("backoff", retryState.currentBackoff).
+               Msg("Attempting node metdata fetch retry")
+
+       // try to fetch metadata
+       node, fetchErr := r.fetcher.FetchNodeWithRetry(ctx, addr)
+
+       if fetchErr != nil {
+               // retry failed - update backoff
+               retryState.attemptCount++
+               retryState.lastError = fetchErr
+
+               // calculate next backoff interval
+               nextBackoff := time.Duration(float64(retryState.currentBackoff) 
* r.config.Multiplier)
+               if nextBackoff > r.config.MaxInterval {
+                       nextBackoff = r.config.MaxInterval
+               }
+               retryState.currentBackoff = nextBackoff
+               retryState.nextRetryTime = time.Now().Add(nextBackoff)
+
+               r.log.Warn().
+                       Err(fetchErr).
+                       Str("address", addr).
+                       Int("attempt", retryState.attemptCount).
+                       Dur("next_backoff", nextBackoff).
+                       Time("next_retry", retryState.nextRetryTime).
+                       Msg("Node metadata fetch retry failed, scheduling next 
retry")
+
+               r.retryMutex.Lock()
+               r.retryQueue[addr] = retryState
+               r.retryMutex.Unlock()
+
+               if r.metrics != nil {
+                       r.metrics.IncRetryFailed()
+               }
+               return
+       }
+
+       // success - add to cache
+       r.cacheBase.AddNode(addr, node)
+
+       // remove from retry queue
+       r.retryMutex.Lock()
+       delete(r.retryQueue, addr)
+       queueSize := len(r.retryQueue)
+       r.retryMutex.Unlock()
+
+       // notify handlers
+       r.cacheBase.NotifyHandlers(schema.Metadata{
+               TypeMeta: schema.TypeMeta{
+                       Kind: schema.KindNode,
+                       Name: node.GetMetadata().GetName(),
+               },
+               Spec: node,
+       }, true)
+
+       r.log.Info().
+               Str("address", addr).
+               Str("name", node.GetMetadata().GetName()).
+               Int("total_attempts", retryState.attemptCount).
+               Msg("Node metadata fetched successfully after retry")
+
+       // update metrics
+       if r.metrics != nil {
+               r.metrics.IncRetrySuccess()
+               r.metrics.SetQueueSize(float64(queueSize))
+       }
+
+       // call success callback if set
+       if r.onSuccess != nil {
+               r.onSuccess(addr, node)
+       }
+
+       // call added to node callback if set
+       if r.onAddedToNode != nil {
+               r.onAddedToNode(addr)
+       }
+}
+
+// CleanupRetryQueue removes addresses from retry queue that are not in the 
provided set.
+func (r *RetryManager) CleanupRetryQueue(validAddresses map[string]bool) {
+       r.retryMutex.Lock()
+       defer r.retryMutex.Unlock()
+
+       for addr := range r.retryQueue {
+               if !validAddresses[addr] {
+                       delete(r.retryQueue, addr)
+                       r.log.Debug().
+                               Str("address", addr).
+                               Msg("Removed node from retry queue (no longer 
valid)")
+               }
+       }
+
+       if r.metrics != nil {
+               r.metrics.SetQueueSize(float64(len(r.retryQueue)))
+       }
+}
diff --git a/banyand/metadata/dns/dns.go b/banyand/metadata/discovery/dns/dns.go
similarity index 70%
rename from banyand/metadata/dns/dns.go
rename to banyand/metadata/discovery/dns/dns.go
index 2664f5df..fe472f04 100644
--- a/banyand/metadata/dns/dns.go
+++ b/banyand/metadata/discovery/dns/dns.go
@@ -31,25 +31,23 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
 )
 
 // Service implements DNS-based node discovery.
 type Service struct {
+       *common.DiscoveryServiceBase
        lastQueryTime     time.Time
        resolver          Resolver
        pathToReloader    map[string]*pkgtls.Reloader
        srvAddrToPath     map[string]string
-       nodeCache         map[string]*databasev1.Node
+       addressToSrvAddr  map[string]string
        closer            *run.Closer
-       log               *logger.Logger
        metrics           *metrics
-       handlers          []schema.EventHandler
        lastSuccessfulDNS map[string][]string
        srvAddresses      []string
        caCertPaths       []string
@@ -57,21 +55,23 @@ type Service struct {
        initInterval      time.Duration
        initDuration      time.Duration
        grpcTimeout       time.Duration
-       cacheMutex        sync.RWMutex
-       handlersMutex     sync.RWMutex
        lastQueryMutex    sync.RWMutex
+       addressMutex      sync.RWMutex
        tlsEnabled        bool
 }
 
 // Config holds configuration for DNS discovery service.
 type Config struct {
-       CACertPaths  []string
-       SRVAddresses []string
-       InitInterval time.Duration
-       InitDuration time.Duration
-       PollInterval time.Duration
-       GRPCTimeout  time.Duration
-       TLSEnabled   bool
+       CACertPaths          []string
+       SRVAddresses         []string
+       InitInterval         time.Duration
+       InitDuration         time.Duration
+       PollInterval         time.Duration
+       GRPCTimeout          time.Duration
+       TLSEnabled           bool
+       RetryInitialInterval time.Duration
+       RetryMaxInterval     time.Duration
+       RetryMultiplier      float64
 }
 
 // Resolver defines the interface for DNS SRV lookups.
@@ -109,16 +109,25 @@ func NewService(cfg Config) (*Service, error) {
                grpcTimeout:       cfg.GRPCTimeout,
                tlsEnabled:        cfg.TLSEnabled,
                caCertPaths:       cfg.CACertPaths,
-               nodeCache:         make(map[string]*databasev1.Node),
-               handlers:          make([]schema.EventHandler, 0),
                lastSuccessfulDNS: make(map[string][]string),
                pathToReloader:    make(map[string]*pkgtls.Reloader),
                srvAddrToPath:     make(map[string]string),
-               closer:            run.NewCloser(1),
-               log:               logger.GetLogger("metadata-discovery-dns"),
+               addressToSrvAddr:  make(map[string]string),
+               closer:            run.NewCloser(2),
                resolver:          &defaultResolver{},
        }
 
+       // initialize discovery service base (cache + retry manager)
+       svc.DiscoveryServiceBase = common.NewServiceBase(
+               "metadata-discovery-dns",
+               svc,
+               common.RetryConfig{
+                       InitialInterval: cfg.RetryInitialInterval,
+                       MaxInterval:     cfg.RetryMaxInterval,
+                       Multiplier:      cfg.RetryMultiplier,
+               },
+       )
+
        // create shared reloaders for CA certificates
        if svc.tlsEnabled {
                for srvIdx, certPath := range cfg.CACertPaths {
@@ -127,13 +136,13 @@ func NewService(cfg Config) (*Service, error) {
 
                        // check if we already have a Reloader for this path
                        if _, exists := svc.pathToReloader[certPath]; exists {
-                               svc.log.Debug().Str("certPath", 
certPath).Int("srvIndex", srvIdx).
+                               svc.GetLogger().Debug().Str("certPath", 
certPath).Int("srvIndex", srvIdx).
                                        Msg("Reusing existing CA certificate 
reloader")
                                continue
                        }
 
                        // create new Reloader for this unique path
-                       reloader, reloaderErr := 
pkgtls.NewClientCertReloader(certPath, svc.log)
+                       reloader, reloaderErr := 
pkgtls.NewClientCertReloader(certPath, svc.GetLogger())
                        if reloaderErr != nil {
                                // clean up any already-created reloaders
                                for _, r := range svc.pathToReloader {
@@ -144,7 +153,7 @@ func NewService(cfg Config) (*Service, error) {
                        }
 
                        svc.pathToReloader[certPath] = reloader
-                       svc.log.Info().Str("certPath", 
certPath).Int("srvIndex", srvIdx).
+                       svc.GetLogger().Info().Str("certPath", 
certPath).Int("srvIndex", srvIdx).
                                Str("srvAddress", srvAddr).Msg("Initialized DNS 
CA certificate reloader")
                }
        }
@@ -162,6 +171,60 @@ func newServiceWithResolver(cfg Config, resolver Resolver) 
(*Service, error) {
        return svc, nil
 }
 
+// GetDialOptions implements GRPCDialOptionsProvider for DNS-specific TLS 
setup.
+func (s *Service) GetDialOptions(address string) ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       // find which SRV address this node address belongs to
+       s.addressMutex.RLock()
+       srvAddr, found := s.addressToSrvAddr[address]
+       s.addressMutex.RUnlock()
+
+       if !found {
+               // fallback: use any available SRV for backward compatibility
+               for srv := range s.srvAddrToPath {
+                       srvAddr = srv
+                       break
+               }
+               if srvAddr == "" {
+                       return nil, fmt.Errorf("no SRV address mapping found 
for %s", address)
+               }
+       }
+
+       // look up which Reloader to use for this address
+       if len(s.pathToReloader) > 0 {
+               // look up the cert path for this SRV address
+               certPath, pathExists := s.srvAddrToPath[srvAddr]
+               if !pathExists {
+                       return nil, fmt.Errorf("no cert path found for SRV %s 
(address %s)", srvAddr, address)
+               }
+
+               // get the Reloader for this cert path
+               reloader, reloaderExists := s.pathToReloader[certPath]
+               if !reloaderExists {
+                       return nil, fmt.Errorf("no reloader found for cert path 
%s (address %s)", certPath, address)
+               }
+
+               // get fresh TLS config from the Reloader
+               tlsConfig, configErr := reloader.GetClientTLSConfig("")
+               if configErr != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader for address %s: %w", address, configErr)
+               }
+
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       // fallback to static TLS config (when no reloaders configured)
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, "")
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
 func (s *Service) getTLSDialOptions(srvAddr, address string) 
([]grpc.DialOption, error) {
        if !s.tlsEnabled {
                return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
@@ -201,7 +264,7 @@ func (s *Service) getTLSDialOptions(srvAddr, address 
string) ([]grpc.DialOption,
 
 // Start begins the DNS discovery background process.
 func (s *Service) Start(ctx context.Context) error {
-       s.log.Debug().Msg("Starting DNS-based node discovery service")
+       s.GetLogger().Debug().Msg("Starting DNS-based node discovery service")
 
        // start all Reloaders
        if len(s.pathToReloader) > 0 {
@@ -216,22 +279,41 @@ func (s *Service) Start(ctx context.Context) error {
                                return fmt.Errorf("failed to start CA 
certificate reloader for path %s: %w", certPath, startErr)
                        }
                        startedReloaders = append(startedReloaders, reloader)
-                       s.log.Debug().Str("certPath", certPath).Msg("Started CA 
certificate reloader")
+                       s.GetLogger().Debug().Str("certPath", 
certPath).Msg("Started CA certificate reloader")
                }
        }
 
        go s.discoveryLoop(ctx)
+       go s.retryScheduler(ctx)
 
        return nil
 }
 
+func (s *Service) retryScheduler(ctx context.Context) {
+       // check retry queue every second
+       checkTicker := time.NewTicker(1 * time.Second)
+       defer checkTicker.Stop()
+
+       for {
+               select {
+               case <-checkTicker.C:
+                       s.RetryManager.ProcessRetryQueue(ctx)
+
+               case <-s.closer.CloseNotify():
+                       return
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
 func (s *Service) discoveryLoop(ctx context.Context) {
        // add the init phase finish time
        initPhaseEnd := time.Now().Add(s.initDuration)
 
        for {
                if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
-                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+                       s.GetLogger().Err(err).Msg("failed to query DNS and 
update nodes")
                }
 
                // wait for next interval
@@ -281,13 +363,13 @@ func (s *Service) queryDNSAndUpdateNodes(ctx 
context.Context) error {
        for srv, err := range srvToErrors {
                if cachedAddrs, exists := s.lastSuccessfulDNS[srv]; exists {
                        finalAddresses[srv] = cachedAddrs
-                       s.log.Warn().
+                       s.GetLogger().Warn().
                                Str("srv", srv).
                                Err(err).
                                Strs("cached_addresses", cachedAddrs).
                                Msg("Using cached addresses for failed SRV 
query")
                } else {
-                       s.log.Warn().
+                       s.GetLogger().Warn().
                                Str("srv", srv).
                                Err(err).
                                Msg("SRV query failed and no cached addresses 
available")
@@ -308,12 +390,12 @@ func (s *Service) queryDNSAndUpdateNodes(ctx 
context.Context) error {
                return fmt.Errorf("all DNS queries failed and no cached 
addresses available: %w", errors.Join(allErrors...))
        }
 
-       if s.log.Debug().Enabled() {
+       if s.GetLogger().Debug().Enabled() {
                totalAddrs := 0
                for _, addrs := range finalAddresses {
                        totalAddrs += len(addrs)
                }
-               s.log.Debug().
+               s.GetLogger().Debug().
                        Int("total_addresses", totalAddrs).
                        Int("successful_srvs", len(srvToAddresses)).
                        Int("failed_srvs", len(srvToErrors)).
@@ -380,87 +462,74 @@ func (s *Service) queryAllSRVRecords(ctx context.Context) 
(map[string][]string,
 
 func (s *Service) updateNodeCache(ctx context.Context, srvToAddresses 
map[string][]string) error {
        var addErrors []error
+       s.addressMutex.Lock()
        for srvAddr, addrs := range srvToAddresses {
                for _, addr := range addrs {
-                       s.cacheMutex.RLock()
-                       _, exists := s.nodeCache[addr]
-                       s.cacheMutex.RUnlock()
+                       s.addressToSrvAddr[addr] = srvAddr
+               }
+       }
+       s.addressMutex.Unlock()
+
+       // process new nodes
+       for _, addrs := range srvToAddresses {
+               for _, addr := range addrs {
+                       _, exists := s.GetCachedNode(addr)
 
                        if !exists {
+                               if s.RetryManager.IsInRetry(addr) {
+                                       continue
+                               }
+
                                // fetch node metadata from gRPC
-                               node, fetchErr := s.fetchNodeMetadata(ctx, 
srvAddr, addr)
+                               node, fetchErr := s.fetchNodeMetadata(ctx, addr)
                                if fetchErr != nil {
-                                       s.log.Warn().
+                                       s.GetLogger().Warn().
                                                Err(fetchErr).
                                                Str("address", addr).
                                                Msg("Failed to fetch node 
metadata")
                                        addErrors = append(addErrors, fetchErr)
+                                       s.RetryManager.AddToRetry(addr, 
fetchErr)
                                        continue
                                }
 
-                               s.cacheMutex.Lock()
-                               if _, alreadyAdded := s.nodeCache[addr]; 
!alreadyAdded {
-                                       s.nodeCache[addr] = node
-
-                                       // notify handlers after releasing lock
-                                       s.notifyHandlers(schema.Metadata{
-                                               TypeMeta: schema.TypeMeta{
-                                                       Kind: schema.KindNode,
-                                                       Name: 
node.GetMetadata().GetName(),
-                                               },
-                                               Spec: node,
-                                       }, true)
-
-                                       s.log.Debug().
-                                               Str("address", addr).
-                                               Str("name", 
node.GetMetadata().GetName()).
-                                               Msg("New node discovered and 
added to cache")
-                               }
-                               s.cacheMutex.Unlock()
+                               s.AddNodeAndNotify(addr, node, "New node 
discovered and added to cache")
                        }
                }
        }
 
-       // collect nodes to delete first
+       // collect addresses to keep
        allAddr := make(map[string]bool)
        for _, addrs := range srvToAddresses {
                for _, addr := range addrs {
                        allAddr[addr] = true
                }
        }
-       s.cacheMutex.Lock()
-       nodesToDelete := make(map[string]*databasev1.Node)
-       for addr, node := range s.nodeCache {
+
+       // find nodes to delete
+       currentAddresses := s.GetAllNodeAddresses()
+       addressesToDelete := make([]string, 0)
+       for _, addr := range currentAddresses {
                if !allAddr[addr] {
-                       nodesToDelete[addr] = node
+                       addressesToDelete = append(addressesToDelete, addr)
                }
        }
 
-       // delete from cache while still holding lock
-       for addr, node := range nodesToDelete {
-               delete(s.nodeCache, addr)
-               s.log.Debug().
-                       Str("address", addr).
-                       Str("name", node.GetMetadata().GetName()).
-                       Msg("Node removed from cache (no longer in DNS)")
-       }
-       cacheSize := len(s.nodeCache)
-       s.cacheMutex.Unlock()
+       // delete nodes and notify handlers
+       s.RemoveNodesAndNotify(addressesToDelete, "Node removed from cache (no 
longer in DNS)")
 
-       // Notify handlers after releasing lock
-       for _, node := range nodesToDelete {
-               s.notifyHandlers(schema.Metadata{
-                       TypeMeta: schema.TypeMeta{
-                               Kind: schema.KindNode,
-                               Name: node.GetMetadata().GetName(),
-                       },
-                       Spec: node,
-               }, false)
+       // clean up address to SRV mapping for deleted nodes
+       s.addressMutex.Lock()
+       for _, addr := range addressesToDelete {
+               delete(s.addressToSrvAddr, addr)
        }
+       s.addressMutex.Unlock()
+
+       // clean up retry queue for removed nodes
+       s.RetryManager.CleanupRetryQueue(allAddr)
 
        // update total nodes metric
        if s.metrics != nil {
-               s.metrics.totalNodesCount.Set(float64(cacheSize))
+               s.metrics.totalNodesCount.Set(float64(s.GetCacheSize()))
        }
 
        if len(addErrors) > 0 {
@@ -470,7 +539,12 @@ func (s *Service) updateNodeCache(ctx context.Context, 
srvToAddresses map[string
        return nil
 }
 
-func (s *Service) fetchNodeMetadata(ctx context.Context, srvAddr, address 
string) (*databasev1.Node, error) {
+// FetchNodeWithRetry implements NodeFetcher interface for retry manager.
+func (s *Service) FetchNodeWithRetry(ctx context.Context, address string) 
(*databasev1.Node, error) {
+       return s.fetchNodeMetadata(ctx, address)
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, address string) 
(*databasev1.Node, error) {
        // record gRPC query metrics
        startTime := time.Now()
        var grpcErr error
@@ -486,70 +560,31 @@ func (s *Service) fetchNodeMetadata(ctx context.Context, 
srvAddr, address string
                }
        }()
 
-       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
-       defer cancel()
-
-       // for TLS connections with other nodes to getting metadata
-       dialOpts, err := s.getTLSDialOptions(srvAddr, address)
+       // use common fetcher
+       node, err := common.FetchNodeMetadata(ctx, address, s.grpcTimeout, s)
        if err != nil {
-               grpcErr = fmt.Errorf("failed to get TLS dial options: %w", err)
-               return nil, grpcErr
-       }
-       // nolint:contextcheck
-       conn, connErr := grpchelper.Conn(address, s.grpcTimeout, dialOpts...)
-       if connErr != nil {
-               grpcErr = fmt.Errorf("failed to connect to %s: %w", address, 
connErr)
-               return nil, grpcErr
-       }
-       defer conn.Close()
-
-       // query metadata of the node
-       client := databasev1.NewNodeQueryServiceClient(conn)
-       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
-       if callErr != nil {
-               grpcErr = fmt.Errorf("failed to get current node from %s: %w", 
address, callErr)
-               return nil, grpcErr
-       }
-
-       return resp.GetNode(), nil
-}
-
-func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool) 
{
-       s.handlersMutex.RLock()
-       defer s.handlersMutex.RUnlock()
-
-       for _, handler := range s.handlers {
-               if isAddOrUpdate {
-                       handler.OnAddOrUpdate(metadata)
-               } else {
-                       handler.OnDelete(metadata)
-               }
+               grpcErr = err
+               return nil, err
        }
-}
-
-// RegisterHandler registers an event handler for node changes.
-func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
-       s.handlersMutex.Lock()
-       defer s.handlersMutex.Unlock()
-
-       s.handlers = append(s.handlers, handler)
-       s.log.Debug().Str("handler", name).Msg("Registered DNS node discovery 
handler")
+       return node, nil
 }
 
 // SetMetrics set the OMR metrics.
 func (s *Service) SetMetrics(factory observability.Factory) {
        s.metrics = newMetrics(factory)
+       s.DiscoveryServiceBase.SetMetrics(s.metrics)
 }
 
 // Close stops the DNS discovery service.
 func (s *Service) Close() error {
-       s.closer.Done()
+       s.closer.Done() // for discoveryLoop
+       s.closer.Done() // for retryScheduler
        s.closer.CloseThenWait()
 
        // stop all Reloaders
        for certPath, reloader := range s.pathToReloader {
                reloader.Stop()
-               s.log.Debug().Str("certPath", certPath).Msg("Stopped CA 
certificate reloader")
+               s.GetLogger().Debug().Str("certPath", certPath).Msg("Stopped CA 
certificate reloader")
        }
 
        return nil
@@ -566,42 +601,8 @@ func (s *Service) ListNode(ctx context.Context, role 
databasev1.Role) ([]*databa
                        return nil, err
                }
        }
-       s.cacheMutex.RLock()
-       defer s.cacheMutex.RUnlock()
-
-       var result []*databasev1.Node
-       for _, node := range s.nodeCache {
-               // filter by role if specified
-               if role != databasev1.Role_ROLE_UNSPECIFIED {
-                       hasRole := false
-                       for _, nodeRole := range node.GetRoles() {
-                               if nodeRole == role {
-                                       hasRole = true
-                                       break
-                               }
-                       }
-                       if !hasRole {
-                               continue
-                       }
-               }
-               result = append(result, node)
-       }
-
-       return result, nil
-}
-
-// GetNode current node from cache.
-func (s *Service) GetNode(_ context.Context, nodeName string) 
(*databasev1.Node, error) {
-       s.cacheMutex.RLock()
-       defer s.cacheMutex.RUnlock()
-
-       for _, node := range s.nodeCache {
-               if node.GetMetadata() != nil && node.GetMetadata().GetName() == 
nodeName {
-                       return node, nil
-               }
-       }
-
-       return nil, fmt.Errorf("node %s not found", nodeName)
+       // delegate to base for filtering
+       return s.NodeCacheBase.ListNode(ctx, role)
 }
 
 // RegisterNode update the configurations of a node, it should not be invoked.
diff --git a/banyand/metadata/dns/dns_test.go 
b/banyand/metadata/discovery/dns/dns_test.go
similarity index 99%
rename from banyand/metadata/dns/dns_test.go
rename to banyand/metadata/discovery/dns/dns_test.go
index 818df974..5467002f 100644
--- a/banyand/metadata/dns/dns_test.go
+++ b/banyand/metadata/discovery/dns/dns_test.go
@@ -35,7 +35,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/dns"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/dns"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
@@ -516,10 +516,9 @@ var _ = Describe("DNS Discovery Service", func() {
 
                        // Call queryDNSAndUpdateNodes again
                        // One DNS fails, so it should fallback to cached 
addresses (all 4 nodes)
+                       // With retry mechanism, nodes already in retry queue 
are skipped, so no errors returned
                        queryErr2 := svc.QueryDNSAndUpdateNodes(ctx)
-                       Expect(queryErr2).To(HaveOccurred())
-                       // Verify the gRPC error is expected (trying to connect 
to non-existent servers)
-                       Expect(queryErr2.Error()).To(ContainSubstring("failed 
to connect"))
+                       Expect(queryErr2).NotTo(HaveOccurred())
 
                        // Verify fallback happened - cache still has all 4 
nodes from first success
                        cachedAfterFailure := svc.GetLastSuccessfulDNS()
diff --git a/banyand/metadata/dns/export_test.go 
b/banyand/metadata/discovery/dns/export_test.go
similarity index 92%
rename from banyand/metadata/dns/export_test.go
rename to banyand/metadata/discovery/dns/export_test.go
index a28b4cf7..fd1b2564 100644
--- a/banyand/metadata/dns/export_test.go
+++ b/banyand/metadata/discovery/dns/export_test.go
@@ -63,12 +63,12 @@ func (s *Service) GetReloaderCount() int {
 
 // GetNodeCache returns the current node cache for testing.
 func (s *Service) GetNodeCache() map[string]*databasev1.Node {
-       s.cacheMutex.RLock()
-       defer s.cacheMutex.RUnlock()
-
-       result := make(map[string]*databasev1.Node, len(s.nodeCache))
-       for k, v := range s.nodeCache {
-               result[k] = v
+       result := make(map[string]*databasev1.Node)
+       addresses := s.GetAllNodeAddresses()
+       for _, addr := range addresses {
+               if node, exists := s.GetCachedNode(addr); exists {
+                       result[addr] = node
+               }
        }
        return result
 }
diff --git a/banyand/metadata/dns/metrics.go 
b/banyand/metadata/discovery/dns/metrics.go
similarity index 75%
rename from banyand/metadata/dns/metrics.go
rename to banyand/metadata/discovery/dns/metrics.go
index 87038580..04e0b0ca 100644
--- a/banyand/metadata/dns/metrics.go
+++ b/banyand/metadata/discovery/dns/metrics.go
@@ -44,6 +44,12 @@ type metrics struct {
        discoveryDuration      meter.Histogram
 
        totalNodesCount meter.Gauge
+
+       // Retry-related metrics
+       nodeRetryCount        meter.Counter
+       nodeRetryFailedCount  meter.Counter
+       nodeRetrySuccessCount meter.Counter
+       retryQueueSize        meter.Gauge
 }
 
 // newMetrics creates a new metrics instance.
@@ -70,5 +76,31 @@ func newMetrics(factory observability.Factory) *metrics {
                discoveryDuration:      
factory.NewHistogram("discovery_duration", meter.DefBuckets),
 
                totalNodesCount: factory.NewGauge("total_nodes_count"),
+
+               // Retry-related metrics
+               nodeRetryCount:        factory.NewCounter("node_retry_count"),
+               nodeRetryFailedCount:  
factory.NewCounter("node_retry_failed_count"),
+               nodeRetrySuccessCount: 
factory.NewCounter("node_retry_success_count"),
+               retryQueueSize:        factory.NewGauge("retry_queue_size"),
        }
 }
+
+// IncRetryCount implements RetryMetrics interface.
+func (m *metrics) IncRetryCount() {
+       m.nodeRetryCount.Inc(1)
+}
+
+// IncRetrySuccess implements RetryMetrics interface.
+func (m *metrics) IncRetrySuccess() {
+       m.nodeRetrySuccessCount.Inc(1)
+}
+
+// IncRetryFailed implements RetryMetrics interface.
+func (m *metrics) IncRetryFailed() {
+       m.nodeRetryFailedCount.Inc(1)
+}
+
+// SetQueueSize implements RetryMetrics interface.
+func (m *metrics) SetQueueSize(size float64) {
+       m.retryQueueSize.Set(size)
+}
diff --git a/banyand/metadata/dns/testdata/ca_cert.pem 
b/banyand/metadata/discovery/dns/testdata/ca_cert.pem
similarity index 100%
rename from banyand/metadata/dns/testdata/ca_cert.pem
rename to banyand/metadata/discovery/dns/testdata/ca_cert.pem
diff --git a/banyand/metadata/dns/testdata/server_key.pem 
b/banyand/metadata/discovery/dns/testdata/server_key.pem
similarity index 100%
rename from banyand/metadata/dns/testdata/server_key.pem
rename to banyand/metadata/discovery/dns/testdata/server_key.pem
diff --git a/banyand/metadata/discovery/file/file.go 
b/banyand/metadata/discovery/file/file.go
new file mode 100644
index 00000000..7938d466
--- /dev/null
+++ b/banyand/metadata/discovery/file/file.go
@@ -0,0 +1,466 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata 
management.
+package file
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "sync"
+       "time"
+
+       "github.com/fsnotify/fsnotify"
+       "google.golang.org/grpc"
+       "gopkg.in/yaml.v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+       *common.DiscoveryServiceBase
+       addressToNodeConfig map[string]NodeConfig
+       closer              *run.Closer
+       metrics             *metrics
+       watcher             *fsnotify.Watcher
+       filePath            string
+       grpcTimeout         time.Duration
+       fetchInterval       time.Duration
+       reloadMutex         sync.Mutex
+       configMutex         sync.RWMutex
+       started             bool
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+       FilePath             string
+       GRPCTimeout          time.Duration
+       FetchInterval        time.Duration
+       RetryInitialInterval time.Duration
+       RetryMaxInterval     time.Duration
+       RetryMultiplier      float64
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+       Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+       Name       string `yaml:"name"`
+       Address    string `yaml:"grpc_address"`
+       CACertPath string `yaml:"ca_cert_path"`
+       TLSEnabled bool   `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+       if cfg.FilePath == "" {
+               return nil, errors.New("file path cannot be empty")
+       }
+
+       // validate file exists and is readable
+       if _, err := os.Stat(cfg.FilePath); err != nil {
+               return nil, fmt.Errorf("failed to access file path %s: %w", 
cfg.FilePath, err)
+       }
+
+       // validate retry config
+       if cfg.RetryMultiplier < 1.0 {
+               return nil, fmt.Errorf("retry multiplier must be >= 1.0, got 
%f", cfg.RetryMultiplier)
+       }
+       if cfg.RetryMaxInterval < cfg.RetryInitialInterval {
+               return nil, fmt.Errorf("retry max interval (%v) must be >= 
initial interval (%v)",
+                       cfg.RetryMaxInterval, cfg.RetryInitialInterval)
+       }
+       if cfg.RetryInitialInterval <= 0 {
+               return nil, errors.New("retry initial interval must be 
positive")
+       }
+
+       // create fsnotify watcher
+       fileWatcher, watchErr := fsnotify.NewWatcher()
+       if watchErr != nil {
+               return nil, fmt.Errorf("failed to create fsnotify watcher: %w", 
watchErr)
+       }
+
+       svc := &Service{
+               filePath:            cfg.FilePath,
+               addressToNodeConfig: make(map[string]NodeConfig),
+               closer:              run.NewCloser(3),
+               grpcTimeout:         cfg.GRPCTimeout,
+               fetchInterval:       cfg.FetchInterval,
+               watcher:             fileWatcher,
+       }
+
+       // initialize discovery service base
+       svc.DiscoveryServiceBase = common.NewServiceBase(
+               "metadata-discovery-file",
+               svc,
+               common.RetryConfig{
+                       InitialInterval: cfg.RetryInitialInterval,
+                       MaxInterval:     cfg.RetryMaxInterval,
+                       Multiplier:      cfg.RetryMultiplier,
+               },
+       )
+
+       // set callbacks for retry manager
+       svc.RetryManager.SetSuccessCallback(func(_ string, _ *databasev1.Node) {
+               if svc.metrics != nil {
+                       svc.metrics.nodeConnectedCount.Inc(1)
+               }
+       })
+
+       return svc, nil
+}
+
+// GetDialOptions implements GRPCDialOptionsProvider for file-specific 
per-node TLS.
+func (s *Service) GetDialOptions(address string) ([]grpc.DialOption, error) {
+       s.configMutex.RLock()
+       nodeConfig, exists := s.addressToNodeConfig[address]
+       s.configMutex.RUnlock()
+
+       if !exists {
+               return nil, fmt.Errorf("no node configuration found for address 
%s", address)
+       }
+
+       return grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, false, 
nodeConfig.CACertPath)
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.GetLogger().Debug().Str("file_path", s.filePath).Msg("Starting 
file-based node discovery service")
+
+       // initial load
+       if err := s.loadAndParseFile(ctx); err != nil {
+               return fmt.Errorf("failed to load initial configuration: %w", 
err)
+       }
+
+       // start fsnotify watcher
+       if err := s.watcher.Add(s.filePath); err != nil {
+               return fmt.Errorf("failed to add file to watcher: %w", err)
+       }
+
+       // mark as started
+       s.started = true
+
+       // start goroutines
+       go s.watchFileChanges(ctx)
+       go s.periodicFetch(ctx)
+       go s.retryScheduler(ctx)
+
+       return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+       s.reloadMutex.Lock()
+       defer s.reloadMutex.Unlock()
+       startTime := time.Now()
+       var parseErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.fileLoadCount.Inc(1)
+                       s.metrics.fileLoadDuration.Observe(duration.Seconds())
+                       if parseErr != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       data, err := os.ReadFile(s.filePath)
+       if err != nil {
+               parseErr = fmt.Errorf("failed to read file: %w", err)
+               return parseErr
+       }
+
+       var cfg NodeFileConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               return fmt.Errorf("failed to parse YAML: %w", err)
+       }
+
+       // validate required fields
+       for idx, node := range cfg.Nodes {
+               if node.Address == "" {
+                       parseErr = fmt.Errorf("node %s at index %d is missing 
required field: grpc_address", node.Name, idx)
+                       return parseErr
+               }
+               if node.TLSEnabled && node.CACertPath == "" {
+                       parseErr = fmt.Errorf("node %s at index %d has TLS 
enabled but missing ca_cert_path", node.Name, idx)
+                       return parseErr
+               }
+       }
+
+       // update cache
+       s.updateNodeCache(ctx, cfg.Nodes)
+
+       s.GetLogger().Debug().Int("node_count", 
len(cfg.Nodes)).Msg("Successfully loaded configuration file")
+       return nil
+}
+
+// FetchNodeWithRetry implements NodeFetcher interface for retry manager.
+func (s *Service) FetchNodeWithRetry(ctx context.Context, address string) 
(*databasev1.Node, error) {
+       // use common fetcher with address
+       return common.FetchNodeMetadata(ctx, address, s.grpcTimeout, s)
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig 
NodeConfig) (*databasev1.Node, error) {
+       node, err := common.FetchNodeMetadata(ctx, nodeConfig.Address, 
s.grpcTimeout, s)
+       if err != nil {
+               return nil, fmt.Errorf("failed to fetch metadata for node %s: 
%w", nodeConfig.Name, err)
+       }
+
+       return node, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+       newAddressToNodeConfig := make(map[string]NodeConfig)
+       for _, nodeConfig := range newNodes {
+               newAddressToNodeConfig[nodeConfig.Address] = nodeConfig
+       }
+
+       // save old config for comparison and replace with new one
+       s.configMutex.Lock()
+       oldAddressToNodeConfig := s.addressToNodeConfig
+       s.addressToNodeConfig = newAddressToNodeConfig
+       s.configMutex.Unlock()
+
+       // process new or updated nodes
+       for _, nodeConfig := range newNodes {
+               _, existsInCache := s.GetCachedNode(nodeConfig.Address)
+               // node already connected, skip
+               if existsInCache {
+                       continue
+               }
+
+               // node in retry queue - check if config changed
+               if s.RetryManager.IsInRetry(nodeConfig.Address) {
+                       oldConfig, hasOldConfig := 
oldAddressToNodeConfig[nodeConfig.Address]
+
+                       if hasOldConfig && nodeConfigChanged(oldConfig, 
nodeConfig) {
+                               s.GetLogger().Info().
+                                       Str("address", nodeConfig.Address).
+                                       Msg("Node configuration changed, 
resetting retry state")
+                               
s.RetryManager.RemoveFromRetry(nodeConfig.Address)
+                       } else {
+                               // config unchanged, let retry scheduler handle 
it
+                               continue
+                       }
+               }
+
+               // try to fetch metadata for new node
+               node, fetchErr := s.fetchNodeMetadata(ctx, nodeConfig)
+               if fetchErr != nil {
+                       // add to retry queue
+                       s.RetryManager.AddToRetry(nodeConfig.Address, fetchErr)
+                       continue
+               }
+
+               // successfully fetched - add to cache
+               if s.AddNodeAndNotify(nodeConfig.Address, node, "Node 
discovered and added to cache") {
+                       if s.metrics != nil {
+                               s.metrics.nodeConnectedCount.Inc(1)
+                       }
+               }
+       }
+
+       // find nodes to delete
+       currentAddresses := s.GetAllNodeAddresses()
+       addressesToDelete := make([]string, 0)
+       for _, addr := range currentAddresses {
+               if _, inFile := newAddressToNodeConfig[addr]; !inFile {
+                       addressesToDelete = append(addressesToDelete, addr)
+               }
+       }
+
+       // delete nodes and notify handlers
+       s.RemoveNodesAndNotify(addressesToDelete, "Node removed from cache (no 
longer in file)")
+
+       // clean up retry queue for removed nodes
+       validAddresses := make(map[string]bool)
+       for addr := range newAddressToNodeConfig {
+               validAddresses[addr] = true
+       }
+       s.RetryManager.CleanupRetryQueue(validAddresses)
+
+       // update metrics
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(s.GetCacheSize()))
+               
s.metrics.retryQueueSize.Set(float64(s.RetryManager.GetQueueSize()))
+       }
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+       s.metrics = newMetrics(factory)
+       s.DiscoveryServiceBase.SetMetrics(s.metrics)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+       s.GetLogger().Debug().Msg("Closing file discovery service")
+
+       // close fsnotify watcher
+       if s.watcher != nil {
+               if err := s.watcher.Close(); err != nil {
+                       s.GetLogger().Error().Err(err).Msg("Failed to close 
fsnotify watcher")
+               }
+       }
+
+       // only wait for goroutines if Start() was called
+       if s.started {
+               s.closer.CloseThenWait()
+       }
+
+       return nil
+}
+
+// RegisterNode is not supported in file discovery mode.
+func (s *Service) RegisterNode(_ context.Context, _ *databasev1.Node, _ bool) 
error {
+       return errors.New("manual node registration not supported in file 
discovery mode")
+}
+
+// UpdateNode is not supported in file discovery mode.
+func (s *Service) UpdateNode(_ context.Context, _ *databasev1.Node) error {
+       return errors.New("manual node update not supported in file discovery 
mode")
+}
+
+func (s *Service) watchFileChanges(ctx context.Context) {
+       defer s.closer.Done()
+
+       for {
+               select {
+               case event, ok := <-s.watcher.Events:
+                       if !ok {
+                               s.GetLogger().Info().Msg("fsnotify watcher 
events channel closed")
+                               return
+                       }
+
+                       s.GetLogger().Debug().
+                               Str("file", event.Name).
+                               Str("op", event.Op.String()).
+                               Msg("Detected node metadata file changed event")
+
+                       // handle relevant events (Write, Create, Remove, 
Rename)
+                       // generally same as tlsReloader
+                       if 
event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename) != 0 {
+                               // re-add file to watcher if it was 
removed/renamed
+                               if event.Op&(fsnotify.Remove|fsnotify.Rename) 
!= 0 {
+                                       s.GetLogger().Info().Str("file", 
event.Name).Msg("File removed/renamed, re-adding to watcher")
+                                       _ = s.watcher.Remove(s.filePath)
+
+                                       // wait briefly for file operations to 
complete
+                                       time.Sleep(1 * time.Second)
+
+                                       // try to re-add with retry
+                                       maxRetries := 5
+                                       for attempt := 0; attempt < maxRetries; 
attempt++ {
+                                               if addErr := 
s.watcher.Add(s.filePath); addErr == nil {
+                                                       
s.GetLogger().Debug().Str("file", s.filePath).Msg("Re-added file to watcher")
+                                                       break
+                                               } else if attempt == 
maxRetries-1 {
+                                                       s.GetLogger().Error().
+                                                               Err(addErr).
+                                                               Str("file", 
s.filePath).
+                                                               Msg("Failed to 
re-add file to watcher after retries")
+                                               }
+                                               time.Sleep(500 * 
time.Millisecond)
+                                       }
+                               } else {
+                                       // wait for file stability
+                                       time.Sleep(200 * time.Millisecond)
+                               }
+
+                               // immediate reload
+                               if reloadErr := s.loadAndParseFile(ctx); 
reloadErr != nil {
+                                       
s.GetLogger().Warn().Err(reloadErr).Msg("Failed to reload configuration file 
(fsnotify)")
+                               } else {
+                                       s.GetLogger().Info().Msg("Configuration 
file reloaded successfully (fsnotify)")
+                               }
+                       }
+
+               case watchErr, ok := <-s.watcher.Errors:
+                       if !ok {
+                               s.GetLogger().Info().Msg("fsnotify watcher 
errors channel closed")
+                               return
+                       }
+                       s.GetLogger().Error().Err(watchErr).Msg("Error from 
fsnotify watcher")
+                       if s.metrics != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+
+               case <-s.closer.CloseNotify():
+                       return
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+func (s *Service) periodicFetch(ctx context.Context) {
+       defer s.closer.Done()
+
+       fetchTicker := time.NewTicker(s.fetchInterval)
+       defer fetchTicker.Stop()
+
+       for {
+               select {
+               case <-fetchTicker.C:
+                       if err := s.loadAndParseFile(ctx); err != nil {
+                               s.GetLogger().Warn().Err(err).Msg("Failed to 
reload configuration file (periodic)")
+                       } else {
+                               s.GetLogger().Debug().Msg("Configuration file 
reloaded successfully (periodic)")
+                       }
+               case <-s.closer.CloseNotify():
+                       return
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+func (s *Service) retryScheduler(ctx context.Context) {
+       defer s.closer.Done()
+
+       // check retry queue every second
+       checkTicker := time.NewTicker(1 * time.Second)
+       defer checkTicker.Stop()
+
+       for {
+               select {
+               case <-checkTicker.C:
+                       s.RetryManager.ProcessRetryQueue(ctx)
+
+               case <-s.closer.CloseNotify():
+                       return
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
+func nodeConfigChanged(older, newer NodeConfig) bool {
+       return older.Address != newer.Address ||
+               older.CACertPath != newer.CACertPath ||
+               older.TLSEnabled != newer.TLSEnabled
+}
diff --git a/banyand/metadata/discovery/file/file_test.go 
b/banyand/metadata/discovery/file/file_test.go
new file mode 100644
index 00000000..8d0620b3
--- /dev/null
+++ b/banyand/metadata/discovery/file/file_test.go
@@ -0,0 +1,616 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package file
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "os"
+       "path/filepath"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/health"
+       grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+const (
+       testGRPCTimeout          = 2 * time.Second
+       testFetchInterval        = 200 * time.Millisecond
+       testRetryInitialInterval = 1 * time.Second
+       testRetryMaxInterval     = 5 * time.Minute
+       testRetryMultiplier      = 2.0
+)
+
+func testConfig(filePath string) Config {
+       return Config{
+               FilePath:             filePath,
+               GRPCTimeout:          testGRPCTimeout,
+               FetchInterval:        testFetchInterval,
+               RetryInitialInterval: testRetryInitialInterval,
+               RetryMaxInterval:     testRetryMaxInterval,
+               RetryMultiplier:      testRetryMultiplier,
+       }
+}
+
+func TestNewService(t *testing.T) {
+       t.Run("valid config", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+    grpc_address: 127.0.0.1:17912
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(testConfig(configFile))
+               require.NoError(t, err)
+               require.NotNil(t, svc)
+               require.NoError(t, svc.Close())
+       })
+
+       t.Run("empty file path", func(t *testing.T) {
+               _, err := NewService(Config{FilePath: ""})
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "file path cannot be empty")
+       })
+
+       t.Run("non-existent file", func(t *testing.T) {
+               _, err := NewService(testConfig("/not/exist"))
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "failed to access file path")
+       })
+}
+
+func TestStartWithInvalidConfig(t *testing.T) {
+       ctx := context.Background()
+
+       t.Run("invalid yaml", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+    grpc_address: [invalid
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(testConfig(configFile))
+               require.NoError(t, err)
+               defer svc.Close()
+
+               err = svc.Start(ctx)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "failed to parse YAML")
+       })
+
+       t.Run("missing address", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(testConfig(configFile))
+               require.NoError(t, err)
+               defer svc.Close()
+
+               err = svc.Start(ctx)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "missing required field: 
grpc_address")
+       })
+
+       t.Run("tls enabled without ca cert", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+    grpc_address: 127.0.0.1:17912
+    tls_enabled: true
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(testConfig(configFile))
+               require.NoError(t, err)
+               defer svc.Close()
+
+               err = svc.Start(ctx)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "missing ca_cert_path")
+       })
+}
+
+func TestStartAndCacheNodes(t *testing.T) {
+       listener, grpcServer, nodeServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+
+       nodeName := "test-node"
+       serverAddr := listener.Addr().String()
+       nodeServer.setNode(newTestNode(nodeName, serverAddr))
+
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: %s
+    grpc_address: %s
+`, nodeName, serverAddr))
+       defer os.Remove(configFile)
+
+       svc, err := NewService(testConfig(configFile))
+       require.NoError(t, err)
+       defer svc.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       require.NoError(t, svc.Start(ctx))
+
+       nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+       require.NoError(t, listErr)
+       require.Len(t, nodes, 1)
+       assert.Equal(t, nodeName, nodes[0].GetMetadata().GetName())
+       assert.Equal(t, serverAddr, nodes[0].GetGrpcAddress())
+
+       nodeFromCache, getErr := svc.GetNode(ctx, nodeName)
+       require.NoError(t, getErr)
+       assert.Equal(t, nodeName, nodeFromCache.GetMetadata().GetName())
+}
+
+func TestHandlerNotifications(t *testing.T) {
+       listenerOne, grpcServerOne, nodeServerOne := startMockGRPCServer(t)
+       defer grpcServerOne.Stop()
+       defer listenerOne.Close()
+       addrOne := listenerOne.Addr().String()
+       nodeServerOne.setNode(newTestNode("node-one", addrOne))
+
+       listenerTwo, grpcServerTwo, nodeServerTwo := startMockGRPCServer(t)
+       defer grpcServerTwo.Stop()
+       defer listenerTwo.Close()
+       addrTwo := listenerTwo.Addr().String()
+       nodeServerTwo.setNode(newTestNode("node-two", addrTwo))
+
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: node-one
+    grpc_address: %s
+`, addrOne))
+       defer os.Remove(configFile)
+
+       svc, err := NewService(testConfig(configFile))
+       require.NoError(t, err)
+       defer svc.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       var mu sync.Mutex
+       added := make([]string, 0)
+       deleted := make([]string, 0)
+       handler := &testEventHandler{
+               onAdd: func(metadata schema.Metadata) {
+                       mu.Lock()
+                       defer mu.Unlock()
+                       added = append(added, metadata.Name)
+               },
+               onDelete: func(metadata schema.Metadata) {
+                       mu.Lock()
+                       defer mu.Unlock()
+                       deleted = append(deleted, metadata.Name)
+               },
+       }
+       svc.RegisterHandler("test", handler)
+
+       require.NoError(t, svc.Start(ctx))
+
+       require.Eventually(t, func() bool {
+               mu.Lock()
+               defer mu.Unlock()
+               return len(added) == 1 && added[0] == "node-one"
+       }, 3*time.Second, 50*time.Millisecond)
+
+       updateConfigFile(t, configFile, fmt.Sprintf(`
+nodes:
+  - name: node-two
+    grpc_address: %s
+`, addrTwo))
+
+       require.Eventually(t, func() bool {
+               mu.Lock()
+               defer mu.Unlock()
+               return len(added) == 2 && added[1] == "node-two" && 
len(deleted) == 1 && deleted[0] == "node-one"
+       }, 5*time.Second, 50*time.Millisecond)
+
+       nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+       require.NoError(t, listErr)
+       require.Len(t, nodes, 1)
+       assert.Equal(t, "node-two", nodes[0].GetMetadata().GetName())
+}
+
+func TestListNodeRoleFilter(t *testing.T) {
+       listener, grpcServer, nodeServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+       addr := listener.Addr().String()
+       nodeServer.setNode(newTestNode("role-test-node", addr))
+
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: role-test-node
+    grpc_address: %s
+`, addr))
+       defer os.Remove(configFile)
+
+       svc, err := NewService(testConfig(configFile))
+       require.NoError(t, err)
+       defer svc.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       require.NoError(t, svc.Start(ctx))
+
+       nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+       require.NoError(t, listErr)
+       require.Len(t, nodes, 1)
+
+       nodes, listErr = svc.ListNode(ctx, databasev1.Role_ROLE_DATA)
+       require.NoError(t, listErr)
+       assert.Empty(t, nodes)
+}
+
+func TestGetNode(t *testing.T) {
+       listener, grpcServer, nodeServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+       addr := listener.Addr().String()
+       nodeServer.setNode(newTestNode("cached-node", addr))
+
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: cached-node
+    grpc_address: %s
+`, addr))
+       defer os.Remove(configFile)
+
+       svc, err := NewService(testConfig(configFile))
+       require.NoError(t, err)
+       defer svc.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       require.NoError(t, svc.Start(ctx))
+
+       node, getErr := svc.GetNode(ctx, "cached-node")
+       require.NoError(t, getErr)
+       assert.Equal(t, "cached-node", node.GetMetadata().GetName())
+
+       _, getErr = svc.GetNode(ctx, "non-existent")
+       require.Error(t, getErr)
+       assert.Contains(t, getErr.Error(), "not found")
+}
+
+func TestConcurrentAccess(t *testing.T) {
+       listener, grpcServer, nodeServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+       addr := listener.Addr().String()
+       nodeServer.setNode(newTestNode("concurrent-node", addr))
+
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: concurrent-node
+    grpc_address: %s
+`, addr))
+       defer os.Remove(configFile)
+
+       svc, err := NewService(testConfig(configFile))
+       require.NoError(t, err)
+       defer svc.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       require.NoError(t, svc.Start(ctx))
+
+       var wg sync.WaitGroup
+       for i := 0; i < 10; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for j := 0; j < 100; j++ {
+                               if _, errList := svc.ListNode(ctx, 
databasev1.Role_ROLE_UNSPECIFIED); errList != nil {
+                                       t.Errorf("list node: %v", errList)
+                               }
+                               if _, errGet := svc.GetNode(ctx, 
"concurrent-node"); errGet != nil {
+                                       t.Errorf("get node: %v", errGet)
+                               }
+                       }
+               }()
+       }
+
+       wg.Wait()
+}
+
+func createTempConfigFile(t *testing.T, content string) string {
+       t.Helper()
+       tmpDir := t.TempDir()
+       tmpFile := filepath.Join(tmpDir, "nodes.yaml")
+       require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0o600))
+       return tmpFile
+}
+
+func updateConfigFile(t *testing.T, path string, content string) {
+       t.Helper()
+       require.NoError(t, os.WriteFile(path, []byte(content), 0o600))
+}
+
+func newTestNode(name, address string) *databasev1.Node {
+       return &databasev1.Node{
+               Metadata: &commonv1.Metadata{
+                       Name: name,
+               },
+               GrpcAddress: address,
+               CreatedAt:   timestamppb.Now(),
+       }
+}
+
+type testEventHandler struct {
+       onAdd    func(metadata schema.Metadata)
+       onDelete func(metadata schema.Metadata)
+}
+
+func (h *testEventHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+       return false, nil
+}
+
+func (h *testEventHandler) OnAddOrUpdate(metadata schema.Metadata) {
+       if h.onAdd != nil {
+               h.onAdd(metadata)
+       }
+}
+
+func (h *testEventHandler) OnDelete(metadata schema.Metadata) {
+       if h.onDelete != nil {
+               h.onDelete(metadata)
+       }
+}
+
+type mockNodeQueryServer struct {
+       databasev1.UnimplementedNodeQueryServiceServer
+       node *databasev1.Node
+       mu   sync.RWMutex
+}
+
+func (m *mockNodeQueryServer) GetCurrentNode(_ context.Context, _ 
*databasev1.GetCurrentNodeRequest) (*databasev1.GetCurrentNodeResponse, error) {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+       if m.node == nil {
+               return nil, fmt.Errorf("no node available")
+       }
+       return &databasev1.GetCurrentNodeResponse{Node: m.node}, nil
+}
+
+func (m *mockNodeQueryServer) setNode(node *databasev1.Node) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.node = node
+}
+
+func startMockGRPCServer(t *testing.T) (net.Listener, *grpc.Server, 
*mockNodeQueryServer) {
+       t.Helper()
+       listener, err := net.Listen("tcp", "127.0.0.1:0")
+       require.NoError(t, err)
+
+       mockServer := &mockNodeQueryServer{}
+       grpcServer := grpc.NewServer()
+       databasev1.RegisterNodeQueryServiceServer(grpcServer, mockServer)
+
+       healthServer := health.NewServer()
+       healthServer.SetServingStatus("", 
grpc_health_v1.HealthCheckResponse_SERVING)
+       grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
+
+       go func() {
+               _ = grpcServer.Serve(listener)
+       }()
+
+       return listener, grpcServer, mockServer
+}
+
+func TestBackoffRetryMechanism(t *testing.T) {
+       ctx := context.Background()
+
+       // create a mock server that will fail initially
+       listener, grpcServer, mockServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+
+       // initially return error (node not available)
+       mockServer.setNode(nil)
+
+       // create config file with the failing node
+       address := listener.Addr().String()
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: retry-node
+    grpc_address: %s
+`, address))
+       defer os.Remove(configFile)
+
+       // use shorter intervals for testing
+       cfg := Config{
+               FilePath:             configFile,
+               GRPCTimeout:          testGRPCTimeout,
+               FetchInterval:        testFetchInterval,
+               RetryInitialInterval: 100 * time.Millisecond,
+               RetryMaxInterval:     1 * time.Second,
+               RetryMultiplier:      2.0,
+       }
+
+       svc, err := NewService(cfg)
+       require.NoError(t, err)
+       defer svc.Close()
+
+       err = svc.Start(ctx)
+       require.NoError(t, err)
+
+       // verify node is in retry queue
+       time.Sleep(150 * time.Millisecond)
+       inRetry := svc.RetryManager.IsInRetry(address)
+       require.True(t, inRetry, "Node should be in retry queue")
+       require.Greater(t, svc.RetryManager.GetQueueSize(), 0, "Retry queue 
should not be empty")
+
+       // verify node is not in cache yet
+       _, inCache := svc.GetCachedNode(address)
+       require.False(t, inCache, "Node should not be in cache yet")
+
+       // now make the node available
+       mockServer.setNode(newTestNode("retry-node", address))
+
+       // wait for retry to succeed
+       require.Eventually(t, func() bool {
+               _, exists := svc.GetCachedNode(address)
+               return exists
+       }, 3*time.Second, 50*time.Millisecond, "Node should eventually be added 
to cache after retry")
+
+       // verify node is removed from retry queue
+       stillInRetry := svc.RetryManager.IsInRetry(address)
+       require.False(t, stillInRetry, "Node should be removed from retry queue 
after success")
+}
+
+func TestBackoffResetOnConfigChange(t *testing.T) {
+       ctx := context.Background()
+
+       listener, grpcServer, mockServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+
+       mockServer.setNode(nil) // fail initially
+
+       address := listener.Addr().String()
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: reset-test-node
+    grpc_address: %s
+    tls_enabled: false
+`, address))
+       defer os.Remove(configFile)
+
+       cfg := Config{
+               FilePath:             configFile,
+               GRPCTimeout:          testGRPCTimeout,
+               FetchInterval:        testFetchInterval,
+               RetryInitialInterval: 100 * time.Millisecond,
+               RetryMaxInterval:     1 * time.Second,
+               RetryMultiplier:      2.0,
+       }
+
+       svc, err := NewService(cfg)
+       require.NoError(t, err)
+       defer svc.Close()
+
+       err = svc.Start(ctx)
+       require.NoError(t, err)
+
+       // wait for node to enter retry queue
+       time.Sleep(150 * time.Millisecond)
+
+       inRetry := svc.RetryManager.IsInRetry(address)
+       require.True(t, inRetry, "Node should be in retry queue")
+       initialQueueSize := svc.RetryManager.GetQueueSize()
+       require.Greater(t, initialQueueSize, 0, "Retry queue should not be 
empty")
+
+       // wait a bit for retries to occur
+       time.Sleep(250 * time.Millisecond)
+
+       // verify still in retry (since mock server still returns error)
+       stillInRetry := svc.RetryManager.IsInRetry(address)
+       require.True(t, stillInRetry, "Node should still be in retry queue")
+
+       // now change the config (enable TLS) - this should reset retry state
+       err = os.WriteFile(configFile, []byte(fmt.Sprintf(`
+nodes:
+  - name: reset-test-node-updated
+    grpc_address: %s
+    tls_enabled: true
+    ca_cert_path: /tmp/ca.crt
+`, address)), 0o600)
+       require.NoError(t, err)
+
+       // wait for file change detection and reload
+       // After config change, the node should be removed from retry and 
re-tried immediately
+       // Since it still fails, it will go back into retry queue
+       time.Sleep(500 * time.Millisecond)
+
+       // verify still in retry queue (config changed, but still failing)
+       inRetryAfterChange := svc.RetryManager.IsInRetry(address)
+       require.True(t, inRetryAfterChange, "Node should be in retry queue 
after config change")
+}
+
+func TestRetryQueueCleanupOnFileRemoval(t *testing.T) {
+       ctx := context.Background()
+
+       listener, grpcServer, mockServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+
+       mockServer.setNode(nil) // fail initially
+
+       address := listener.Addr().String()
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: cleanup-test-node
+    grpc_address: %s
+`, address))
+       defer os.Remove(configFile)
+
+       cfg := Config{
+               FilePath:             configFile,
+               GRPCTimeout:          testGRPCTimeout,
+               FetchInterval:        testFetchInterval,
+               RetryInitialInterval: 100 * time.Millisecond,
+               RetryMaxInterval:     1 * time.Second,
+               RetryMultiplier:      2.0,
+       }
+
+       svc, err := NewService(cfg)
+       require.NoError(t, err)
+       defer svc.Close()
+
+       err = svc.Start(ctx)
+       require.NoError(t, err)
+
+       // wait for node to enter retry queue
+       time.Sleep(150 * time.Millisecond)
+
+       inRetry := svc.RetryManager.IsInRetry(address)
+       require.True(t, inRetry, "Node should be in retry queue")
+
+       // remove node from file
+       err = os.WriteFile(configFile, []byte(`nodes: []`), 0o600)
+       require.NoError(t, err)
+
+       // wait for file change detection and reload
+       time.Sleep(500 * time.Millisecond)
+
+       // verify node is removed from retry queue
+       stillInRetry := svc.RetryManager.IsInRetry(address)
+       require.False(t, stillInRetry, "Node should be removed from retry 
queue")
+}
diff --git a/banyand/metadata/discovery/file/metrics.go 
b/banyand/metadata/discovery/file/metrics.go
new file mode 100644
index 00000000..60d49ba4
--- /dev/null
+++ b/banyand/metadata/discovery/file/metrics.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package file
+
+import (
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type metrics struct {
+       fileLoadCount         meter.Counter
+       fileLoadFailedCount   meter.Counter
+       fileLoadDuration      meter.Histogram
+       totalNodesCount       meter.Gauge
+       nodeRetryCount        meter.Counter
+       nodeRetryFailedCount  meter.Counter
+       nodeRetrySuccessCount meter.Counter
+       nodeConnectedCount    meter.Counter
+       retryQueueSize        meter.Gauge
+}
+
+// newMetrics creates a new metrics instance.
+func newMetrics(factory observability.Factory) *metrics {
+       return &metrics{
+               fileLoadCount:         factory.NewCounter("file_load_count"),
+               fileLoadFailedCount:   
factory.NewCounter("file_load_failed_count"),
+               fileLoadDuration:      
factory.NewHistogram("file_load_duration", meter.DefBuckets),
+               totalNodesCount:       factory.NewGauge("total_nodes_count"),
+               nodeRetryCount:        factory.NewCounter("node_retry_count"),
+               nodeRetryFailedCount:  
factory.NewCounter("node_retry_failed_count"),
+               nodeRetrySuccessCount: 
factory.NewCounter("node_retry_success_count"),
+               nodeConnectedCount:    
factory.NewCounter("node_connected_count"),
+               retryQueueSize:        factory.NewGauge("retry_queue_size"),
+       }
+}
+
+// IncRetryCount implements RetryMetrics interface.
+func (m *metrics) IncRetryCount() {
+       m.nodeRetryCount.Inc(1)
+}
+
+// IncRetrySuccess implements RetryMetrics interface.
+func (m *metrics) IncRetrySuccess() {
+       m.nodeRetrySuccessCount.Inc(1)
+}
+
+// IncRetryFailed implements RetryMetrics interface.
+func (m *metrics) IncRetryFailed() {
+       m.nodeRetryFailedCount.Inc(1)
+}
+
+// SetQueueSize implements RetryMetrics interface.
+func (m *metrics) SetQueueSize(size float64) {
+       m.retryQueueSize.Set(size)
+}
diff --git a/docs/operation/node-discovery.md b/docs/operation/node-discovery.md
index 990182f1..ee45887f 100644
--- a/docs/operation/node-discovery.md
+++ b/docs/operation/node-discovery.md
@@ -7,12 +7,13 @@ Node discovery enables BanyanDB nodes to locate and 
communicate with each other
 1. **Request Routing**: When liaison nodes need to send requests to data nodes 
for query/write execution.
 2. **Data Migration**: When the lifecycle service queries the node list to 
perform shard migration and rebalancing.
 
-BanyanDB supports two discovery mechanisms to accommodate different deployment 
environments:
+BanyanDB supports three discovery mechanisms to accommodate different 
deployment environments:
 
 - **Etcd-based Discovery**: Traditional distributed consensus approach 
suitable for VM deployments and multi-cloud scenarios.
 - **DNS-based Discovery**: Cloud-native solution leveraging Kubernetes service 
discovery infrastructure.
+- **File-based Discovery**: Static configuration file approach for simple 
deployments and testing environments.
 
-This document provides guidance on configuring and operating both discovery 
modes.
+This document provides guidance on configuring and operating all discovery 
modes.
 
 ## Etcd-Based Discovery
 
@@ -160,9 +161,13 @@ DNS-based discovery provides a cloud-native alternative 
leveraging Kubernetes' b
 
 # TLS configuration
 --node-discovery-dns-tls=true                # Enable TLS for DNS discovery 
(default: false)
---node-discovery-dns-ca-certs=/path/to/ca.crt # CA certificates 
(comma-separated)
+--node-discovery-dns-ca-certs=/path/to/ca.crt,/path/to/another.crt # Ordered 
CA bundle matching SRV addresses
 ```
 
+`node-discovery-dns-fetch-init-interval` and 
`node-discovery-dns-fetch-init-duration` define the aggressive polling strategy 
during bootstrap before falling back to the steady-state
+`node-discovery-dns-fetch-interval`. All metadata fetches share the same 
`node-discovery-grpc-timeout`, which is also reused by the file discovery mode. 
When TLS is enabled, the CA cert
+list must match the SRV address order, ensuring each role (e.g., data, 
liaison) can be verified with its own trust bundle.
+
 ### Configuration Examples
 
 **Single Zone Discovery:**
@@ -178,7 +183,7 @@ banyand data \
 **Multi-Zone Discovery:**
 
 ```shell
-banyand query \
+banyand data \
   --node-discovery-mode=dns \
   
--node-discovery-dns-srv-addresses=_grpc._tcp.data.zone1.svc.local,_grpc._tcp.data.zone2.svc.local
 \
   --node-discovery-dns-fetch-interval=10s
@@ -278,6 +283,110 @@ spec:
 
 This creates DNS SRV record: 
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
 
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where 
nodes are defined in a YAML file. This mode is ideal for testing environments, 
small deployments, or scenarios where dynamic service discovery infrastructure 
is unavailable.
+
+The service periodically reloads the configuration file and automatically 
updates the node registry when changes are detected.
+
+### How it Works
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Nodes that fail to connect are skipped and will be attempted again on the 
next periodic file reload
+5. Reload the file at the `node-discovery-file-fetch-interval` cadence as a 
backup to fsnotify-based reloads, reprocessing every entry (including nodes 
that previously failed)
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file                    # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s             # Timeout for metadata fetch 
(default: 5s)
+
+# Interval settings
+--node-discovery-file-fetch-interval=5m               # Polling interval to 
reprocess the discovery file (default: 5m)
+--node-discovery-file-retry-initial-interval=1s       # Initial retry delay 
for failed node metadata fetches (default: 1s)
+--node-discovery-file-retry-max-interval=2m           # Upper bound for retry 
delay backoff (default: 2m)
+--node-discovery-file-retry-multiplier=2.0            # Multiplicative factor 
applied between retries (default: 2.0)
+```
+
+`node-discovery-file-fetch-interval` controls the periodic full reload that 
acts as a safety net even if filesystem events are missed.  
+`node-discovery-file-retry-*` flags configure the exponential backoff used 
when a node cannot be reached over gRPC. Failed nodes are retried starting from 
the initial interval,
+multiplied by the configured factor until the max interval is reached.
+
+### YAML Configuration Format
+
+```yaml
+nodes:
+  - name: liaison-0
+    grpc_address: 192.168.1.10:18912
+  - name: data-hot-0
+    grpc_address: 192.168.1.20:17912
+    tls_enabled: true
+    ca_cert_path: /etc/banyandb/certs/ca.crt
+  - name: data-cold-0
+    grpc_address: 192.168.1.30:17912
+```
+
+**Configuration Fields:**
+
+- **name** (required): Identifier for the node
+- **grpc_address** (required): gRPC endpoint in `host:port` format
+- **tls_enabled** (optional): Enable TLS for gRPC connection (default: false)
+- **ca_cert_path** (optional): Path to CA certificate file (required when TLS 
is enabled)
+
+### Configuration Examples
+
+**Basic Configuration:**
+
+```shell
+banyand data \
+  --node-discovery-mode=file \
+  --node-discovery-file-path=/etc/banyandb/nodes.yaml
+```
+
+**With Custom Polling and Retry Settings:**
+
+```shell
+banyand data \
+  --node-discovery-mode=file \
+  --node-discovery-file-path=/etc/banyandb/nodes.yaml \
+  --node-discovery-file-fetch-interval=20m \
+  --node-discovery-file-retry-initial-interval=5s \
+  --node-discovery-file-retry-max-interval=1m \
+  --node-discovery-file-retry-multiplier=1.5
+```
+
+### Node Lifecycle
+
+#### Initial/Interval Load
+
+When the service starts:
+1. Reads the YAML configuration file
+2. Validates required fields (`grpc_address`)
+3. Attempts gRPC connection to each node
+4. Successfully connected nodes → added to cache
+
+### Error Handling
+
+**Startup Errors:**
+- Missing or invalid file path → service fails to start
+- Invalid YAML format → service fails to start
+- Missing required fields → service fails to start
+
+**Runtime Errors:**
+- gRPC connection failure → node skipped; retried on next file reload
+- File read error → keep existing cache, log error
+- File deleted → keep existing cache, log error
+
 ## Choosing a Discovery Mode
 
 ### Etcd Mode - Best For
@@ -293,4 +402,13 @@ This creates DNS SRV record: 
`_grpc._tcp.banyandb-data.default.svc.cluster.local
 - Simplified operations (no external etcd management)
 - Cloud-native architecture alignment
 - StatefulSets with stable network identities
-- Rapid deployment without external dependencies
\ No newline at end of file
+- Rapid deployment without external dependencies
+
+### File Mode - Best For
+
+- Development and testing environments
+- Small static clusters (< 10 nodes)
+- Air-gapped deployments without service discovery infrastructure
+- Proof-of-concept and demo setups
+- Environments where node membership is manually managed
+- Scenarios requiring predictable and auditable node configuration
\ No newline at end of file


Reply via email to