Copilot commented on code in PR #928:
URL: 
https://github.com/apache/skywalking-banyandb/pull/928#discussion_r2667382813


##########
banyand/metadata/client.go:
##########
@@ -360,6 +414,10 @@ func (s *clientService) NodeRegistry() schema.Node {
        if s.dnsDiscovery != nil {
                return s.dnsDiscovery
        }
+       // If file discovery is enabled, use it instead of etcd
+       if s.fileDiscovery != nil {
+               return s.fileDiscovery
+       }
        // Otherwise use etcd schema registry
        return s.schemaRegistry

Review Comment:
   The order of checking discovery services in NodeRegistry() is inconsistent 
with the order in RegisterHandler(). In RegisterHandler, file discovery is 
checked before DNS discovery (lines 373-379), but in NodeRegistry, DNS 
discovery is checked before file discovery (lines 414-422). This inconsistency 
could lead to confusion. Consider using a consistent priority order across all 
methods.



##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
 
 This creates DNS SRV record: 
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
 
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where 
nodes are defined in a YAML file. This mode is ideal for testing environments, 
small deployments, or scenarios where dynamic service discovery infrastructure 
is unavailable.
+
+The service monitors the configuration file for changes and automatically 
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically

Review Comment:
   The documentation describes retry behavior for failed nodes (line 296, 312, 
364), but the actual implementation uses FetchInterval for periodic file 
reloading in periodicFetch function, not specifically for retrying failed 
nodes. Failed nodes are simply skipped and will be retried on the next periodic 
file load. Consider clarifying that this interval controls how often the file 
is reloaded and all nodes are re-attempted, rather than implying a specific 
retry queue mechanism.
   ```suggestion
   The service periodically reloads the configuration file and automatically 
updates the node registry when changes are detected.
   
   ### How it Works
   
   1. Read node configurations from a YAML file on startup
   2. Attempt to connect to each node via gRPC to fetch full metadata
   3. Successfully connected nodes are added to the cache
   4. Nodes that fail to connect are skipped and will be attempted again on the 
next periodic file reload
   5. Reload the file at a configured interval (FetchInterval), reprocessing 
all nodes (including previously failed ones)
   ```



##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
 
 This creates DNS SRV record: 
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
 
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where 
nodes are defined in a YAML file. This mode is ideal for testing environments, 
small deployments, or scenarios where dynamic service discovery infrastructure 
is unavailable.
+
+The service monitors the configuration file for changes and automatically 
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file                    # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s             # Timeout for metadata fetch 
(default: 5s)
+
+# Retry settings
+--node-discovery-file-retry-interval=20s     # Retry interval for failed nodes 
(default: 20s)

Review Comment:
   The default value shown in the comment (default: 20s) doesn't match the flag 
name which suggests it should be for retry interval, but the configuration flag 
on line 312 is actually for retry-interval while line 306 shows file-path. The 
documentation should clarify that this is the periodic fetch/reload interval 
for checking file changes, not just retry interval for failed nodes.
   ```suggestion
   --node-discovery-file-retry-interval=20s     # Interval to poll the 
discovery file and retry failed nodes (default: 20s)
   ```



##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
 
 This creates DNS SRV record: 
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
 
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where 
nodes are defined in a YAML file. This mode is ideal for testing environments, 
small deployments, or scenarios where dynamic service discovery infrastructure 
is unavailable.
+
+The service monitors the configuration file for changes and automatically 
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file                    # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s             # Timeout for metadata fetch 
(default: 5s)
+
+# Retry settings
+--node-discovery-file-retry-interval=20s     # Retry interval for failed nodes 
(default: 20s)
+```
+
+### YAML Configuration Format
+
+```yaml
+nodes:
+  - name: liaison-0
+    grpc_address: 192.168.1.10:18912
+  - name: data-hot-0
+    grpc_address: 192.168.1.20:17912
+    tls_enabled: true
+    ca_cert_path: /etc/banyandb/certs/ca.crt
+  - name: data-cold-0
+    grpc_address: 192.168.1.30:17912
+```
+
+**Configuration Fields:**
+
+- **name** (required): Identifier for the node
+- **grpc_address** (required): gRPC endpoint in `host:port` format
+- **tls_enabled** (optional): Enable TLS for gRPC connection (default: false)
+- **ca_cert_path** (optional): Path to CA certificate file (required when TLS 
is enabled)
+
+### Configuration Examples
+
+**Basic Configuration:**
+
+```shell
+banyand data \
+  --node-discovery-mode=file \
+  --node-discovery-file-path=/etc/banyandb/nodes.yaml
+```
+
+**With Custom Retry Interval:**
+
+```shell
+banyand liaison \
+  --node-discovery-mode=file \
+  --node-discovery-file-path=/etc/banyandb/nodes.yaml \
+  --node-discovery-file-retry-interval=30s
+```
+
+### Node Lifecycle
+
+#### Initial/Interval Load
+
+When the service starts:
+1. Reads the YAML configuration file
+2. Validates required fields (`grpc_address`)
+3. Attempts gRPC connection to each node
+4. Successfully connected nodes → added to cache
+5. Failed nodes → wait for the next interval

Review Comment:
   The documentation states "Failed nodes → wait for the next interval" but 
this is misleading. The implementation doesn't maintain a separate retry queue 
for failed nodes. Instead, on each periodic fetch interval, the entire file is 
reloaded and all nodes (including previously failed ones) are attempted again. 
The term "retry queue" in line 374 is also inaccurate based on the 
implementation.



##########
banyand/metadata/discovery/file/file_test.go:
##########
@@ -0,0 +1,461 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package file
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "os"
+       "path/filepath"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/health"
+       grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+const (
+       testGRPCTimeout   = 2 * time.Second
+       testFetchInterval = 200 * time.Millisecond
+)
+
+func TestNewService(t *testing.T) {
+       t.Run("valid config", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+    grpc_address: 127.0.0.1:17912
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(Config{
+                       FilePath:      configFile,
+                       GRPCTimeout:   testGRPCTimeout,
+                       FetchInterval: testFetchInterval,
+               })
+               require.NoError(t, err)
+               require.NotNil(t, svc)
+               require.NoError(t, svc.Close())
+       })
+
+       t.Run("empty file path", func(t *testing.T) {
+               _, err := NewService(Config{FilePath: ""})
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "file path cannot be empty")
+       })
+
+       t.Run("non-existent file", func(t *testing.T) {
+               _, err := NewService(Config{
+                       FilePath:      "/not/exist",
+                       GRPCTimeout:   testGRPCTimeout,
+                       FetchInterval: testFetchInterval,
+               })
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "failed to access file path")
+       })
+}
+
+func TestStartWithInvalidConfig(t *testing.T) {
+       ctx := context.Background()
+
+       t.Run("invalid yaml", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+    grpc_address: [invalid
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(Config{
+                       FilePath:      configFile,
+                       GRPCTimeout:   testGRPCTimeout,
+                       FetchInterval: testFetchInterval,
+               })
+               require.NoError(t, err)
+               defer svc.Close()
+
+               err = svc.Start(ctx)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "failed to parse YAML")
+       })
+
+       t.Run("missing address", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(Config{
+                       FilePath:      configFile,
+                       GRPCTimeout:   testGRPCTimeout,
+                       FetchInterval: testFetchInterval,
+               })
+               require.NoError(t, err)
+               defer svc.Close()
+
+               err = svc.Start(ctx)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "missing required field: 
grpc_address")
+       })
+
+       t.Run("tls enabled without ca cert", func(t *testing.T) {
+               configFile := createTempConfigFile(t, `
+nodes:
+  - name: node1
+    grpc_address: 127.0.0.1:17912
+    tls_enabled: true
+`)
+               defer os.Remove(configFile)
+
+               svc, err := NewService(Config{
+                       FilePath:      configFile,
+                       GRPCTimeout:   testGRPCTimeout,
+                       FetchInterval: testFetchInterval,
+               })
+               require.NoError(t, err)
+               defer svc.Close()
+
+               err = svc.Start(ctx)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "missing ca_cert_path")
+       })
+}
+
+func TestStartAndCacheNodes(t *testing.T) {
+       listener, grpcServer, nodeServer := startMockGRPCServer(t)
+       defer grpcServer.Stop()
+       defer listener.Close()
+
+       nodeName := "test-node"
+       serverAddr := listener.Addr().String()
+       nodeServer.setNode(newTestNode(nodeName, serverAddr))
+
+       configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+  - name: %s
+    grpc_address: %s
+`, nodeName, serverAddr))
+       defer os.Remove(configFile)
+
+       svc, err := NewService(Config{
+               FilePath:      configFile,
+               GRPCTimeout:   testGRPCTimeout,
+               FetchInterval: testFetchInterval,
+       })
+       require.NoError(t, err)
+       defer svc.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       require.NoError(t, svc.Start(ctx))
+
+       nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+       require.NoError(t, listErr)
+       require.Len(t, nodes, 1)
+       assert.Equal(t, nodeName, nodes[0].GetMetadata().GetName())
+       assert.Equal(t, serverAddr, nodes[0].GetGrpcAddress())
+
+       nodeFromCache, getErr := svc.GetNode(ctx, serverAddr)

Review Comment:
   The test passes "addr" (an address like "127.0.0.1:12345") to GetNode but 
based on the interface contract established by the etcd and DNS 
implementations, GetNode should accept a node name, not an address. This test 
is passing because the file implementation incorrectly uses address as the 
cache key, but this is actually testing incorrect behavior. The test should be 
updated to call GetNode with the node name once the implementation is fixed to 
properly look up by name.
   ```suggestion
        nodeFromCache, getErr := svc.GetNode(ctx, nodeName)
   ```



##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata 
management.
+package file
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "sync"
+       "time"
+
+       "gopkg.in/yaml.v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+       nodeCache     map[string]*databasev1.Node
+       closer        *run.Closer
+       log           *logger.Logger
+       metrics       *metrics
+       handlers      map[string]schema.EventHandler
+       filePath      string
+       grpcTimeout   time.Duration
+       fetchInterval time.Duration
+       cacheMutex    sync.RWMutex
+       handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+       FilePath      string
+       GRPCTimeout   time.Duration
+       FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+       Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+       Name       string `yaml:"name"`
+       Address    string `yaml:"grpc_address"`
+       CACertPath string `yaml:"ca_cert_path"`
+       TLSEnabled bool   `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+       if cfg.FilePath == "" {
+               return nil, errors.New("file path cannot be empty")
+       }
+
+       // validate file exists and is readable
+       if _, err := os.Stat(cfg.FilePath); err != nil {
+               return nil, fmt.Errorf("failed to access file path %s: %w", 
cfg.FilePath, err)
+       }
+
+       svc := &Service{
+               filePath:      cfg.FilePath,
+               nodeCache:     make(map[string]*databasev1.Node),
+               handlers:      make(map[string]schema.EventHandler),
+               closer:        run.NewCloser(1),
+               log:           logger.GetLogger("metadata-discovery-file"),
+               grpcTimeout:   cfg.GRPCTimeout,
+               fetchInterval: cfg.FetchInterval,
+       }
+
+       return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based 
node discovery service")
+
+       // initial load
+       if err := s.loadAndParseFile(ctx); err != nil {
+               return fmt.Errorf("failed to load initial configuration: %w", 
err)
+       }
+
+       // start periodic fetch loop
+       go s.periodicFetch(ctx)
+
+       return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+       startTime := time.Now()
+       var parseErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.fileLoadCount.Inc(1)
+                       s.metrics.fileLoadDuration.Observe(duration.Seconds())
+                       if parseErr != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       data, err := os.ReadFile(s.filePath)
+       if err != nil {
+               parseErr = fmt.Errorf("failed to read file: %w", err)
+               return parseErr
+       }
+
+       var cfg NodeFileConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+               return parseErr
+       }
+
+       // validate required fields
+       for idx, node := range cfg.Nodes {
+               if node.Address == "" {
+                       parseErr = fmt.Errorf("node %s at index %d is missing 
required field: grpc_address", node.Name, idx)
+                       return parseErr
+               }
+               if node.TLSEnabled && node.CACertPath == "" {
+                       parseErr = fmt.Errorf("node %s at index %d has TLS 
enabled but missing ca_cert_path", node.Name, idx)
+                       return parseErr
+               }
+       }
+
+       // update cache
+       s.updateNodeCache(ctx, cfg.Nodes)
+
+       s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully 
loaded configuration file")
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig 
NodeConfig) (*databasev1.Node, error) {
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // prepare TLS options
+       dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, 
false, nodeConfig.CACertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config for node %s: 
%w", nodeConfig.Name, err)
+       }
+
+       // connect to node
+       // nolint:contextcheck
+       conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout, 
dialOpts...)
+       if connErr != nil {
+               return nil, fmt.Errorf("failed to connect to %s: %w", 
nodeConfig.Address, connErr)
+       }
+       defer conn.Close()
+
+       // query metadata of the node
+       client := databasev1.NewNodeQueryServiceClient(conn)
+       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
+       if callErr != nil {
+               return nil, fmt.Errorf("failed to get current node from %s: 
%w", nodeConfig.Address, callErr)
+       }
+
+       return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+       for _, n := range newNodes {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[n.Address]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch node metadata from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, n)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("node", n.Name).
+                                       Str("address", n.Address).
+                                       Msg("Failed to fetch node metadata, 
will skip")
+                               continue
+                       }
+
+                       s.cacheMutex.Lock()
+                       if _, alreadyAdded := s.nodeCache[n.Address]; 
!alreadyAdded {
+                               s.nodeCache[n.Address] = node
+
+                               // notify handlers after releasing lock
+                               s.notifyHandlers(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                               Name: 
node.GetMetadata().GetName(),
+                                       },
+                                       Spec: node,
+                               }, true)
+
+                               s.log.Debug().
+                                       Str("address", n.Address).
+                                       Str("name", 
node.GetMetadata().GetName()).
+                                       Msg("New node discovered and added to 
cache")
+                       }
+                       s.cacheMutex.Unlock()
+               }
+       }
+
+       // collect nodes to delete first
+       allAddr := make(map[string]bool)
+       for _, n := range newNodes {
+               allAddr[n.Address] = true
+       }
+       s.cacheMutex.Lock()
+       nodesToDelete := make(map[string]*databasev1.Node)
+       for addr, node := range s.nodeCache {
+               if !allAddr[addr] {
+                       nodesToDelete[addr] = node
+               }
+       }
+
+       // delete from cache while still holding lock
+       for addr, node := range nodesToDelete {
+               delete(s.nodeCache, addr)
+               s.log.Debug().
+                       Str("address", addr).
+                       Str("name", node.GetMetadata().GetName()).
+                       Msg("Node removed from cache (no longer in file)")
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // Notify handlers after releasing lock
+       for _, node := range nodesToDelete {
+               s.notifyHandlers(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                               Name: node.GetMetadata().GetName(),
+                       },
+                       Spec: node,
+               }, false)
+       }
+       // update metrics
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool) 
{
+       s.handlersMutex.RLock()
+       defer s.handlersMutex.RUnlock()
+
+       for _, handler := range s.handlers {
+               if isAddOrUpdate {
+                       handler.OnAddOrUpdate(metadata)
+               } else {
+                       handler.OnDelete(metadata)
+               }
+       }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+       s.handlersMutex.Lock()
+       defer s.handlersMutex.Unlock()
+
+       s.handlers[name] = handler
+       s.log.Debug().Str("handler", name).Msg("Registered file node discovery 
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+       s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+       s.closer.Done()
+       s.closer.CloseThenWait()
+
+       return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role) 
([]*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       var result []*databasev1.Node
+       for _, node := range s.nodeCache {
+               // file mode doesn't support role filtering as roles are not 
stored in config
+               // return all nodes when ROLE_UNSPECIFIED, otherwise return 
empty list
+               if role == databasev1.Role_ROLE_UNSPECIFIED {
+                       result = append(result, node)
+               }
+       }
+
+       return result, nil
+}
+
+// GetNode gets a specific node from cache.
+func (s *Service) GetNode(_ context.Context, nodeName string) 
(*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       if node, exists := s.nodeCache[nodeName]; exists {
+               return node, nil
+       }
+
+       return nil, fmt.Errorf("node %s not found", nodeName)
+}
+
+// RegisterNode is not supported in file discovery mode.
+func (s *Service) RegisterNode(_ context.Context, _ *databasev1.Node, _ bool) 
error {
+       return errors.New("manual node registration not supported in file 
discovery mode")
+}
+
+// UpdateNode is not supported in file discovery mode.
+func (s *Service) UpdateNode(_ context.Context, _ *databasev1.Node) error {
+       return errors.New("manual node update not supported in file discovery 
mode")
+}
+
+func (s *Service) periodicFetch(ctx context.Context) {
+       fetchTicker := time.NewTicker(s.fetchInterval)
+
+       for {
+               select {
+               case <-fetchTicker.C:
+                       if err := s.loadAndParseFile(ctx); err != nil {
+                               s.log.Warn().Err(err).Msg("Failed to reload 
configuration file")
+                       }
+               case <-ctx.Done():
+                       return
+               }
+       }

Review Comment:
   The periodicFetch function doesn't check for s.closer.CloseNotify() like the 
DNS discovery's discoveryLoop does (dns.go line 250). This means the periodic 
ticker might not stop promptly when Close() is called. Consider adding a case 
for s.closer.CloseNotify() in the select statement to ensure clean shutdown.



##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata 
management.
+package file
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "sync"
+       "time"
+
+       "gopkg.in/yaml.v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+       nodeCache     map[string]*databasev1.Node
+       closer        *run.Closer
+       log           *logger.Logger
+       metrics       *metrics
+       handlers      map[string]schema.EventHandler
+       filePath      string
+       grpcTimeout   time.Duration
+       fetchInterval time.Duration
+       cacheMutex    sync.RWMutex
+       handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+       FilePath      string
+       GRPCTimeout   time.Duration
+       FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+       Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+       Name       string `yaml:"name"`
+       Address    string `yaml:"grpc_address"`
+       CACertPath string `yaml:"ca_cert_path"`
+       TLSEnabled bool   `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+       if cfg.FilePath == "" {
+               return nil, errors.New("file path cannot be empty")
+       }
+
+       // validate file exists and is readable
+       if _, err := os.Stat(cfg.FilePath); err != nil {
+               return nil, fmt.Errorf("failed to access file path %s: %w", 
cfg.FilePath, err)
+       }
+
+       svc := &Service{
+               filePath:      cfg.FilePath,
+               nodeCache:     make(map[string]*databasev1.Node),
+               handlers:      make(map[string]schema.EventHandler),
+               closer:        run.NewCloser(1),
+               log:           logger.GetLogger("metadata-discovery-file"),
+               grpcTimeout:   cfg.GRPCTimeout,
+               fetchInterval: cfg.FetchInterval,
+       }
+
+       return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based 
node discovery service")
+
+       // initial load
+       if err := s.loadAndParseFile(ctx); err != nil {
+               return fmt.Errorf("failed to load initial configuration: %w", 
err)
+       }
+
+       // start periodic fetch loop
+       go s.periodicFetch(ctx)
+
+       return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+       startTime := time.Now()
+       var parseErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.fileLoadCount.Inc(1)
+                       s.metrics.fileLoadDuration.Observe(duration.Seconds())
+                       if parseErr != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       data, err := os.ReadFile(s.filePath)
+       if err != nil {
+               parseErr = fmt.Errorf("failed to read file: %w", err)
+               return parseErr
+       }
+
+       var cfg NodeFileConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+               return parseErr
+       }
+
+       // validate required fields
+       for idx, node := range cfg.Nodes {
+               if node.Address == "" {
+                       parseErr = fmt.Errorf("node %s at index %d is missing 
required field: grpc_address", node.Name, idx)
+                       return parseErr
+               }
+               if node.TLSEnabled && node.CACertPath == "" {
+                       parseErr = fmt.Errorf("node %s at index %d has TLS 
enabled but missing ca_cert_path", node.Name, idx)
+                       return parseErr
+               }
+       }
+
+       // update cache
+       s.updateNodeCache(ctx, cfg.Nodes)
+
+       s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully 
loaded configuration file")
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig 
NodeConfig) (*databasev1.Node, error) {
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // prepare TLS options
+       dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, 
false, nodeConfig.CACertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config for node %s: 
%w", nodeConfig.Name, err)
+       }
+
+       // connect to node
+       // nolint:contextcheck
+       conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout, 
dialOpts...)
+       if connErr != nil {
+               return nil, fmt.Errorf("failed to connect to %s: %w", 
nodeConfig.Address, connErr)
+       }
+       defer conn.Close()
+
+       // query metadata of the node
+       client := databasev1.NewNodeQueryServiceClient(conn)
+       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
+       if callErr != nil {
+               return nil, fmt.Errorf("failed to get current node from %s: 
%w", nodeConfig.Address, callErr)
+       }
+
+       return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+       for _, n := range newNodes {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[n.Address]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch node metadata from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, n)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("node", n.Name).
+                                       Str("address", n.Address).
+                                       Msg("Failed to fetch node metadata, 
will skip")
+                               continue
+                       }
+
+                       s.cacheMutex.Lock()
+                       if _, alreadyAdded := s.nodeCache[n.Address]; 
!alreadyAdded {
+                               s.nodeCache[n.Address] = node
+
+                               // notify handlers after releasing lock
+                               s.notifyHandlers(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                               Name: 
node.GetMetadata().GetName(),
+                                       },
+                                       Spec: node,
+                               }, true)
+
+                               s.log.Debug().
+                                       Str("address", n.Address).
+                                       Str("name", 
node.GetMetadata().GetName()).
+                                       Msg("New node discovered and added to 
cache")
+                       }
+                       s.cacheMutex.Unlock()
+               }
+       }
+
+       // collect nodes to delete first
+       allAddr := make(map[string]bool)
+       for _, n := range newNodes {
+               allAddr[n.Address] = true
+       }
+       s.cacheMutex.Lock()
+       nodesToDelete := make(map[string]*databasev1.Node)
+       for addr, node := range s.nodeCache {
+               if !allAddr[addr] {
+                       nodesToDelete[addr] = node
+               }
+       }
+
+       // delete from cache while still holding lock
+       for addr, node := range nodesToDelete {
+               delete(s.nodeCache, addr)
+               s.log.Debug().
+                       Str("address", addr).
+                       Str("name", node.GetMetadata().GetName()).
+                       Msg("Node removed from cache (no longer in file)")
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // Notify handlers after releasing lock
+       for _, node := range nodesToDelete {
+               s.notifyHandlers(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                               Name: node.GetMetadata().GetName(),
+                       },
+                       Spec: node,
+               }, false)
+       }
+       // update metrics
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool) 
{
+       s.handlersMutex.RLock()
+       defer s.handlersMutex.RUnlock()
+
+       for _, handler := range s.handlers {
+               if isAddOrUpdate {
+                       handler.OnAddOrUpdate(metadata)
+               } else {
+                       handler.OnDelete(metadata)
+               }
+       }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+       s.handlersMutex.Lock()
+       defer s.handlersMutex.Unlock()
+
+       s.handlers[name] = handler
+       s.log.Debug().Str("handler", name).Msg("Registered file node discovery 
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+       s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+       s.closer.Done()
+       s.closer.CloseThenWait()
+
+       return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role) 
([]*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       var result []*databasev1.Node
+       for _, node := range s.nodeCache {
+               // file mode doesn't support role filtering as roles are not 
stored in config
+               // return all nodes when ROLE_UNSPECIFIED, otherwise return 
empty list
+               if role == databasev1.Role_ROLE_UNSPECIFIED {
+                       result = append(result, node)
+               }
+       }
+
+       return result, nil
+}
+
+// GetNode gets a specific node from cache.
+func (s *Service) GetNode(_ context.Context, nodeName string) 
(*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       if node, exists := s.nodeCache[nodeName]; exists {
+               return node, nil
+       }
+
+       return nil, fmt.Errorf("node %s not found", nodeName)
+}
+
+// RegisterNode is not supported in file discovery mode.
+func (s *Service) RegisterNode(_ context.Context, _ *databasev1.Node, _ bool) 
error {
+       return errors.New("manual node registration not supported in file 
discovery mode")
+}
+
+// UpdateNode is not supported in file discovery mode.
+func (s *Service) UpdateNode(_ context.Context, _ *databasev1.Node) error {
+       return errors.New("manual node update not supported in file discovery 
mode")
+}
+
+func (s *Service) periodicFetch(ctx context.Context) {
+       fetchTicker := time.NewTicker(s.fetchInterval)

Review Comment:
   The ticker created in periodicFetch is never explicitly stopped with 
fetchTicker.Stop(). While the goroutine will exit when the context is done, the 
ticker should be stopped to prevent resource leaks. Consider adding defer 
fetchTicker.Stop() after creating the ticker.
   ```suggestion
        fetchTicker := time.NewTicker(s.fetchInterval)
        defer fetchTicker.Stop()
   ```



##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
 
 This creates DNS SRV record: 
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
 
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where 
nodes are defined in a YAML file. This mode is ideal for testing environments, 
small deployments, or scenarios where dynamic service discovery infrastructure 
is unavailable.
+
+The service monitors the configuration file for changes and automatically 
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file                    # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s             # Timeout for metadata fetch 
(default: 5s)
+
+# Retry settings
+--node-discovery-file-retry-interval=20s     # Retry interval for failed nodes 
(default: 20s)
+```
+
+### YAML Configuration Format
+
+```yaml
+nodes:
+  - name: liaison-0
+    grpc_address: 192.168.1.10:18912
+  - name: data-hot-0
+    grpc_address: 192.168.1.20:17912
+    tls_enabled: true
+    ca_cert_path: /etc/banyandb/certs/ca.crt
+  - name: data-cold-0
+    grpc_address: 192.168.1.30:17912
+```
+
+**Configuration Fields:**
+
+- **name** (required): Identifier for the node
+- **grpc_address** (required): gRPC endpoint in `host:port` format
+- **tls_enabled** (optional): Enable TLS for gRPC connection (default: false)
+- **ca_cert_path** (optional): Path to CA certificate file (required when TLS 
is enabled)
+
+### Configuration Examples
+
+**Basic Configuration:**
+
+```shell
+banyand data \
+  --node-discovery-mode=file \
+  --node-discovery-file-path=/etc/banyandb/nodes.yaml
+```
+
+**With Custom Retry Interval:**
+
+```shell
+banyand liaison \
+  --node-discovery-mode=file \
+  --node-discovery-file-path=/etc/banyandb/nodes.yaml \
+  --node-discovery-file-retry-interval=30s
+```
+
+### Node Lifecycle
+
+#### Initial/Interval Load
+
+When the service starts:
+1. Reads the YAML configuration file
+2. Validates required fields (`grpc_address`)
+3. Attempts gRPC connection to each node
+4. Successfully connected nodes → added to cache
+5. Failed nodes → wait for the next interval
+
+### Error Handling
+
+**Startup Errors:**
+- Missing or invalid file path → service fails to start
+- Invalid YAML format → service fails to start
+- Missing required fields → service fails to start
+
+**Runtime Errors:**
+- gRPC connection failure → node added to retry queue

Review Comment:
   The documentation mentions "node added to retry queue" but the actual 
implementation in file.go does not use a retry queue. Failed nodes are simply 
skipped during updateNodeCache and will be retried when the file is reloaded on 
the next fetch interval. This terminology should be corrected to match the 
actual implementation behavior.
   ```suggestion
   - gRPC connection failure → node skipped; retried on next file reload
   ```



##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata 
management.
+package file
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "sync"
+       "time"
+
+       "gopkg.in/yaml.v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+       nodeCache     map[string]*databasev1.Node
+       closer        *run.Closer
+       log           *logger.Logger
+       metrics       *metrics
+       handlers      map[string]schema.EventHandler
+       filePath      string
+       grpcTimeout   time.Duration
+       fetchInterval time.Duration
+       cacheMutex    sync.RWMutex
+       handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+       FilePath      string
+       GRPCTimeout   time.Duration
+       FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+       Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+       Name       string `yaml:"name"`
+       Address    string `yaml:"grpc_address"`
+       CACertPath string `yaml:"ca_cert_path"`
+       TLSEnabled bool   `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+       if cfg.FilePath == "" {
+               return nil, errors.New("file path cannot be empty")
+       }
+
+       // validate file exists and is readable
+       if _, err := os.Stat(cfg.FilePath); err != nil {
+               return nil, fmt.Errorf("failed to access file path %s: %w", 
cfg.FilePath, err)
+       }
+
+       svc := &Service{
+               filePath:      cfg.FilePath,
+               nodeCache:     make(map[string]*databasev1.Node),
+               handlers:      make(map[string]schema.EventHandler),
+               closer:        run.NewCloser(1),
+               log:           logger.GetLogger("metadata-discovery-file"),
+               grpcTimeout:   cfg.GRPCTimeout,
+               fetchInterval: cfg.FetchInterval,
+       }
+
+       return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based 
node discovery service")
+
+       // initial load
+       if err := s.loadAndParseFile(ctx); err != nil {
+               return fmt.Errorf("failed to load initial configuration: %w", 
err)
+       }
+
+       // start periodic fetch loop
+       go s.periodicFetch(ctx)
+
+       return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+       startTime := time.Now()
+       var parseErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.fileLoadCount.Inc(1)
+                       s.metrics.fileLoadDuration.Observe(duration.Seconds())
+                       if parseErr != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       data, err := os.ReadFile(s.filePath)
+       if err != nil {
+               parseErr = fmt.Errorf("failed to read file: %w", err)
+               return parseErr
+       }
+
+       var cfg NodeFileConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+               return parseErr
+       }
+
+       // validate required fields
+       for idx, node := range cfg.Nodes {
+               if node.Address == "" {
+                       parseErr = fmt.Errorf("node %s at index %d is missing 
required field: grpc_address", node.Name, idx)
+                       return parseErr
+               }
+               if node.TLSEnabled && node.CACertPath == "" {
+                       parseErr = fmt.Errorf("node %s at index %d has TLS 
enabled but missing ca_cert_path", node.Name, idx)
+                       return parseErr
+               }
+       }
+
+       // update cache
+       s.updateNodeCache(ctx, cfg.Nodes)
+
+       s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully 
loaded configuration file")
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig 
NodeConfig) (*databasev1.Node, error) {
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // prepare TLS options
+       dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, 
false, nodeConfig.CACertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config for node %s: 
%w", nodeConfig.Name, err)
+       }
+
+       // connect to node
+       // nolint:contextcheck
+       conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout, 
dialOpts...)
+       if connErr != nil {
+               return nil, fmt.Errorf("failed to connect to %s: %w", 
nodeConfig.Address, connErr)
+       }
+       defer conn.Close()
+
+       // query metadata of the node
+       client := databasev1.NewNodeQueryServiceClient(conn)
+       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
+       if callErr != nil {
+               return nil, fmt.Errorf("failed to get current node from %s: 
%w", nodeConfig.Address, callErr)
+       }
+
+       return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+       for _, n := range newNodes {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[n.Address]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch node metadata from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, n)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("node", n.Name).
+                                       Str("address", n.Address).
+                                       Msg("Failed to fetch node metadata, 
will skip")
+                               continue
+                       }
+
+                       s.cacheMutex.Lock()
+                       if _, alreadyAdded := s.nodeCache[n.Address]; 
!alreadyAdded {
+                               s.nodeCache[n.Address] = node
+
+                               // notify handlers after releasing lock
+                               s.notifyHandlers(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                               Name: 
node.GetMetadata().GetName(),
+                                       },
+                                       Spec: node,
+                               }, true)
+
+                               s.log.Debug().
+                                       Str("address", n.Address).
+                                       Str("name", 
node.GetMetadata().GetName()).
+                                       Msg("New node discovered and added to 
cache")
+                       }
+                       s.cacheMutex.Unlock()
+               }
+       }
+
+       // collect nodes to delete first
+       allAddr := make(map[string]bool)
+       for _, n := range newNodes {
+               allAddr[n.Address] = true
+       }
+       s.cacheMutex.Lock()
+       nodesToDelete := make(map[string]*databasev1.Node)
+       for addr, node := range s.nodeCache {
+               if !allAddr[addr] {
+                       nodesToDelete[addr] = node
+               }
+       }
+
+       // delete from cache while still holding lock
+       for addr, node := range nodesToDelete {
+               delete(s.nodeCache, addr)
+               s.log.Debug().
+                       Str("address", addr).
+                       Str("name", node.GetMetadata().GetName()).
+                       Msg("Node removed from cache (no longer in file)")
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // Notify handlers after releasing lock
+       for _, node := range nodesToDelete {
+               s.notifyHandlers(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                               Name: node.GetMetadata().GetName(),
+                       },
+                       Spec: node,
+               }, false)
+       }
+       // update metrics
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool) 
{
+       s.handlersMutex.RLock()
+       defer s.handlersMutex.RUnlock()
+
+       for _, handler := range s.handlers {
+               if isAddOrUpdate {
+                       handler.OnAddOrUpdate(metadata)
+               } else {
+                       handler.OnDelete(metadata)
+               }
+       }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+       s.handlersMutex.Lock()
+       defer s.handlersMutex.Unlock()
+
+       s.handlers[name] = handler
+       s.log.Debug().Str("handler", name).Msg("Registered file node discovery 
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+       s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+       s.closer.Done()
+       s.closer.CloseThenWait()
+
+       return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role) 
([]*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       var result []*databasev1.Node
+       for _, node := range s.nodeCache {
+               // file mode doesn't support role filtering as roles are not 
stored in config
+               // return all nodes when ROLE_UNSPECIFIED, otherwise return 
empty list
+               if role == databasev1.Role_ROLE_UNSPECIFIED {
+                       result = append(result, node)
+               }
+       }
+
+       return result, nil
+}
+
+// GetNode gets a specific node from cache.
+func (s *Service) GetNode(_ context.Context, nodeName string) 
(*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       if node, exists := s.nodeCache[nodeName]; exists {
+               return node, nil
+       }
+
+       return nil, fmt.Errorf("node %s not found", nodeName)

Review Comment:
   Inconsistent error message format. The comment says "GetNode gets a specific 
node from cache" but the error message uses "node %s not found". For 
consistency with GetNode in dns.go (line 604) which has the same error message, 
consider whether the parameter name should match the error message. In file.go, 
the parameter is named "nodeName" but the cache key is actually the node's 
address (grpc_address), not the name. This is a semantic mismatch - the 
function appears to look up by address, not by name.
   ```suggestion
   func (s *Service) GetNode(_ context.Context, nodeAddress string) 
(*databasev1.Node, error) {
        s.cacheMutex.RLock()
        defer s.cacheMutex.RUnlock()
   
        if node, exists := s.nodeCache[nodeAddress]; exists {
                return node, nil
        }
   
        return nil, fmt.Errorf("node %s not found", nodeAddress)
   ```



##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata 
management.
+package file
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "sync"
+       "time"
+
+       "gopkg.in/yaml.v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+       nodeCache     map[string]*databasev1.Node
+       closer        *run.Closer
+       log           *logger.Logger
+       metrics       *metrics
+       handlers      map[string]schema.EventHandler
+       filePath      string
+       grpcTimeout   time.Duration
+       fetchInterval time.Duration
+       cacheMutex    sync.RWMutex
+       handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+       FilePath      string
+       GRPCTimeout   time.Duration
+       FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+       Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+       Name       string `yaml:"name"`
+       Address    string `yaml:"grpc_address"`
+       CACertPath string `yaml:"ca_cert_path"`
+       TLSEnabled bool   `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+       if cfg.FilePath == "" {
+               return nil, errors.New("file path cannot be empty")
+       }
+
+       // validate file exists and is readable
+       if _, err := os.Stat(cfg.FilePath); err != nil {
+               return nil, fmt.Errorf("failed to access file path %s: %w", 
cfg.FilePath, err)
+       }
+
+       svc := &Service{
+               filePath:      cfg.FilePath,
+               nodeCache:     make(map[string]*databasev1.Node),
+               handlers:      make(map[string]schema.EventHandler),
+               closer:        run.NewCloser(1),
+               log:           logger.GetLogger("metadata-discovery-file"),
+               grpcTimeout:   cfg.GRPCTimeout,
+               fetchInterval: cfg.FetchInterval,
+       }
+
+       return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based 
node discovery service")
+
+       // initial load
+       if err := s.loadAndParseFile(ctx); err != nil {
+               return fmt.Errorf("failed to load initial configuration: %w", 
err)
+       }
+
+       // start periodic fetch loop
+       go s.periodicFetch(ctx)
+
+       return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+       startTime := time.Now()
+       var parseErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.fileLoadCount.Inc(1)
+                       s.metrics.fileLoadDuration.Observe(duration.Seconds())
+                       if parseErr != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       data, err := os.ReadFile(s.filePath)
+       if err != nil {
+               parseErr = fmt.Errorf("failed to read file: %w", err)
+               return parseErr
+       }
+
+       var cfg NodeFileConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+               return parseErr
+       }
+
+       // validate required fields
+       for idx, node := range cfg.Nodes {
+               if node.Address == "" {
+                       parseErr = fmt.Errorf("node %s at index %d is missing 
required field: grpc_address", node.Name, idx)
+                       return parseErr
+               }
+               if node.TLSEnabled && node.CACertPath == "" {
+                       parseErr = fmt.Errorf("node %s at index %d has TLS 
enabled but missing ca_cert_path", node.Name, idx)
+                       return parseErr
+               }
+       }
+
+       // update cache
+       s.updateNodeCache(ctx, cfg.Nodes)
+
+       s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully 
loaded configuration file")
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig 
NodeConfig) (*databasev1.Node, error) {
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // prepare TLS options
+       dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, 
false, nodeConfig.CACertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config for node %s: 
%w", nodeConfig.Name, err)
+       }
+
+       // connect to node
+       // nolint:contextcheck
+       conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout, 
dialOpts...)
+       if connErr != nil {
+               return nil, fmt.Errorf("failed to connect to %s: %w", 
nodeConfig.Address, connErr)
+       }
+       defer conn.Close()
+
+       // query metadata of the node
+       client := databasev1.NewNodeQueryServiceClient(conn)
+       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
+       if callErr != nil {
+               return nil, fmt.Errorf("failed to get current node from %s: 
%w", nodeConfig.Address, callErr)
+       }
+
+       return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+       for _, n := range newNodes {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[n.Address]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch node metadata from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, n)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("node", n.Name).
+                                       Str("address", n.Address).
+                                       Msg("Failed to fetch node metadata, 
will skip")
+                               continue
+                       }
+
+                       s.cacheMutex.Lock()
+                       if _, alreadyAdded := s.nodeCache[n.Address]; 
!alreadyAdded {
+                               s.nodeCache[n.Address] = node
+
+                               // notify handlers after releasing lock
+                               s.notifyHandlers(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                               Name: 
node.GetMetadata().GetName(),
+                                       },
+                                       Spec: node,
+                               }, true)
+
+                               s.log.Debug().
+                                       Str("address", n.Address).
+                                       Str("name", 
node.GetMetadata().GetName()).
+                                       Msg("New node discovered and added to 
cache")
+                       }
+                       s.cacheMutex.Unlock()

Review Comment:
   The comment says "notify handlers after releasing lock" on line 207, but the 
call to notifyHandlers is actually made while still holding the 
cacheMutex.Lock() (acquired on line 203, released on line 221). This is a 
concurrency issue - if a handler performs operations that acquire the same lock 
or blocks, it could cause deadlocks or performance problems. The implementation 
should follow the comment's intent and call notifyHandlers after releasing the 
lock, similar to how it's done correctly for deletions on lines 249-258.
   ```suggestion
                        var (
                                added    bool
                                metadata schema.Metadata
                        )
   
                        s.cacheMutex.Lock()
                        if _, alreadyAdded := s.nodeCache[n.Address]; 
!alreadyAdded {
                                s.nodeCache[n.Address] = node
                                added = true
                                metadata = schema.Metadata{
                                        TypeMeta: schema.TypeMeta{
                                                Kind: schema.KindNode,
                                                Name: 
node.GetMetadata().GetName(),
                                        },
                                        Spec: node,
                                }
                        }
                        s.cacheMutex.Unlock()
   
                        if added {
                                // notify handlers after releasing lock
                                s.notifyHandlers(metadata, true)
   
                                s.log.Debug().
                                        Str("address", n.Address).
                                        Str("name", 
node.GetMetadata().GetName()).
                                        Msg("New node discovered and added to 
cache")
                        }
   ```



##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata 
management.
+package file
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "sync"
+       "time"
+
+       "gopkg.in/yaml.v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+       nodeCache     map[string]*databasev1.Node
+       closer        *run.Closer
+       log           *logger.Logger
+       metrics       *metrics
+       handlers      map[string]schema.EventHandler
+       filePath      string
+       grpcTimeout   time.Duration
+       fetchInterval time.Duration
+       cacheMutex    sync.RWMutex
+       handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+       FilePath      string
+       GRPCTimeout   time.Duration
+       FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+       Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+       Name       string `yaml:"name"`
+       Address    string `yaml:"grpc_address"`
+       CACertPath string `yaml:"ca_cert_path"`
+       TLSEnabled bool   `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+       if cfg.FilePath == "" {
+               return nil, errors.New("file path cannot be empty")
+       }
+
+       // validate file exists and is readable
+       if _, err := os.Stat(cfg.FilePath); err != nil {
+               return nil, fmt.Errorf("failed to access file path %s: %w", 
cfg.FilePath, err)
+       }
+
+       svc := &Service{
+               filePath:      cfg.FilePath,
+               nodeCache:     make(map[string]*databasev1.Node),
+               handlers:      make(map[string]schema.EventHandler),
+               closer:        run.NewCloser(1),
+               log:           logger.GetLogger("metadata-discovery-file"),
+               grpcTimeout:   cfg.GRPCTimeout,
+               fetchInterval: cfg.FetchInterval,
+       }
+
+       return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based 
node discovery service")
+
+       // initial load
+       if err := s.loadAndParseFile(ctx); err != nil {
+               return fmt.Errorf("failed to load initial configuration: %w", 
err)
+       }
+
+       // start periodic fetch loop
+       go s.periodicFetch(ctx)
+
+       return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+       startTime := time.Now()
+       var parseErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.fileLoadCount.Inc(1)
+                       s.metrics.fileLoadDuration.Observe(duration.Seconds())
+                       if parseErr != nil {
+                               s.metrics.fileLoadFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       data, err := os.ReadFile(s.filePath)
+       if err != nil {
+               parseErr = fmt.Errorf("failed to read file: %w", err)
+               return parseErr
+       }
+
+       var cfg NodeFileConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+               return parseErr
+       }
+
+       // validate required fields
+       for idx, node := range cfg.Nodes {
+               if node.Address == "" {
+                       parseErr = fmt.Errorf("node %s at index %d is missing 
required field: grpc_address", node.Name, idx)
+                       return parseErr
+               }
+               if node.TLSEnabled && node.CACertPath == "" {
+                       parseErr = fmt.Errorf("node %s at index %d has TLS 
enabled but missing ca_cert_path", node.Name, idx)
+                       return parseErr
+               }
+       }
+
+       // update cache
+       s.updateNodeCache(ctx, cfg.Nodes)
+
+       s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully 
loaded configuration file")
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig 
NodeConfig) (*databasev1.Node, error) {
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // prepare TLS options
+       dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, 
false, nodeConfig.CACertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config for node %s: 
%w", nodeConfig.Name, err)
+       }
+
+       // connect to node
+       // nolint:contextcheck
+       conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout, 
dialOpts...)
+       if connErr != nil {
+               return nil, fmt.Errorf("failed to connect to %s: %w", 
nodeConfig.Address, connErr)
+       }
+       defer conn.Close()
+
+       // query metadata of the node
+       client := databasev1.NewNodeQueryServiceClient(conn)
+       resp, callErr := client.GetCurrentNode(ctxTimeout, 
&databasev1.GetCurrentNodeRequest{})
+       if callErr != nil {
+               return nil, fmt.Errorf("failed to get current node from %s: 
%w", nodeConfig.Address, callErr)
+       }
+
+       return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+       for _, n := range newNodes {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[n.Address]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch node metadata from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, n)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("node", n.Name).
+                                       Str("address", n.Address).
+                                       Msg("Failed to fetch node metadata, 
will skip")
+                               continue
+                       }
+
+                       s.cacheMutex.Lock()
+                       if _, alreadyAdded := s.nodeCache[n.Address]; 
!alreadyAdded {
+                               s.nodeCache[n.Address] = node
+
+                               // notify handlers after releasing lock
+                               s.notifyHandlers(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                               Name: 
node.GetMetadata().GetName(),
+                                       },
+                                       Spec: node,
+                               }, true)
+
+                               s.log.Debug().
+                                       Str("address", n.Address).
+                                       Str("name", 
node.GetMetadata().GetName()).
+                                       Msg("New node discovered and added to 
cache")
+                       }
+                       s.cacheMutex.Unlock()
+               }
+       }
+
+       // collect nodes to delete first
+       allAddr := make(map[string]bool)
+       for _, n := range newNodes {
+               allAddr[n.Address] = true
+       }
+       s.cacheMutex.Lock()
+       nodesToDelete := make(map[string]*databasev1.Node)
+       for addr, node := range s.nodeCache {
+               if !allAddr[addr] {
+                       nodesToDelete[addr] = node
+               }
+       }
+
+       // delete from cache while still holding lock
+       for addr, node := range nodesToDelete {
+               delete(s.nodeCache, addr)
+               s.log.Debug().
+                       Str("address", addr).
+                       Str("name", node.GetMetadata().GetName()).
+                       Msg("Node removed from cache (no longer in file)")
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // Notify handlers after releasing lock
+       for _, node := range nodesToDelete {
+               s.notifyHandlers(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                               Name: node.GetMetadata().GetName(),
+                       },
+                       Spec: node,
+               }, false)
+       }
+       // update metrics
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool) 
{
+       s.handlersMutex.RLock()
+       defer s.handlersMutex.RUnlock()
+
+       for _, handler := range s.handlers {
+               if isAddOrUpdate {
+                       handler.OnAddOrUpdate(metadata)
+               } else {
+                       handler.OnDelete(metadata)
+               }
+       }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+       s.handlersMutex.Lock()
+       defer s.handlersMutex.Unlock()
+
+       s.handlers[name] = handler
+       s.log.Debug().Str("handler", name).Msg("Registered file node discovery 
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+       s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+       s.closer.Done()
+       s.closer.CloseThenWait()
+
+       return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role) 
([]*databasev1.Node, error) {
+       s.cacheMutex.RLock()
+       defer s.cacheMutex.RUnlock()
+
+       var result []*databasev1.Node
+       for _, node := range s.nodeCache {
+               // file mode doesn't support role filtering as roles are not 
stored in config
+               // return all nodes when ROLE_UNSPECIFIED, otherwise return 
empty list
+               if role == databasev1.Role_ROLE_UNSPECIFIED {

Review Comment:
   The comment states that file mode doesn't support role filtering and returns 
an empty list for non-UNSPECIFIED roles. However, nodes fetched via gRPC 
(fetchNodeMetadata) include role information in their metadata. Consider 
fetching and storing role information from the nodes, then implementing proper 
role filtering like DNS discovery does, rather than silently returning empty 
results which could be confusing for users expecting role-based filtering to 
work.
   ```suggestion
                // return all nodes when ROLE_UNSPECIFIED, otherwise filter by 
node role
                if role == databasev1.Role_ROLE_UNSPECIFIED || node.GetRole() 
== role {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to