Copilot commented on code in PR #928:
URL:
https://github.com/apache/skywalking-banyandb/pull/928#discussion_r2667382813
##########
banyand/metadata/client.go:
##########
@@ -360,6 +414,10 @@ func (s *clientService) NodeRegistry() schema.Node {
if s.dnsDiscovery != nil {
return s.dnsDiscovery
}
+ // If file discovery is enabled, use it instead of etcd
+ if s.fileDiscovery != nil {
+ return s.fileDiscovery
+ }
// Otherwise use etcd schema registry
return s.schemaRegistry
Review Comment:
The order of checking discovery services in NodeRegistry() is inconsistent
with the order in RegisterHandler(). In RegisterHandler, file discovery is
checked before DNS discovery (lines 373-379), but in NodeRegistry, DNS
discovery is checked before file discovery (lines 414-422). This inconsistency
could lead to confusion. Consider using a consistent priority order across all
methods.
##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
This creates DNS SRV record:
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where
nodes are defined in a YAML file. This mode is ideal for testing environments,
small deployments, or scenarios where dynamic service discovery infrastructure
is unavailable.
+
+The service monitors the configuration file for changes and automatically
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
Review Comment:
The documentation describes retry behavior for failed nodes (line 296, 312,
364), but the actual implementation uses FetchInterval for periodic file
reloading in periodicFetch function, not specifically for retrying failed
nodes. Failed nodes are simply skipped and will be retried on the next periodic
file load. Consider clarifying that this interval controls how often the file
is reloaded and all nodes are re-attempted, rather than implying a specific
retry queue mechanism.
```suggestion
The service periodically reloads the configuration file and automatically
updates the node registry when changes are detected.
### How it Works
1. Read node configurations from a YAML file on startup
2. Attempt to connect to each node via gRPC to fetch full metadata
3. Successfully connected nodes are added to the cache
4. Nodes that fail to connect are skipped and will be attempted again on the
next periodic file reload
5. Reload the file at a configured interval (FetchInterval), reprocessing
all nodes (including previously failed ones)
```
##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
This creates DNS SRV record:
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where
nodes are defined in a YAML file. This mode is ideal for testing environments,
small deployments, or scenarios where dynamic service discovery infrastructure
is unavailable.
+
+The service monitors the configuration file for changes and automatically
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s # Timeout for metadata fetch
(default: 5s)
+
+# Retry settings
+--node-discovery-file-retry-interval=20s # Retry interval for failed nodes
(default: 20s)
Review Comment:
The default value shown in the comment (default: 20s) doesn't match the flag
name which suggests it should be for retry interval, but the configuration flag
on line 312 is actually for retry-interval while line 306 shows file-path. The
documentation should clarify that this is the periodic fetch/reload interval
for checking file changes, not just retry interval for failed nodes.
```suggestion
--node-discovery-file-retry-interval=20s # Interval to poll the
discovery file and retry failed nodes (default: 20s)
```
##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
This creates DNS SRV record:
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where
nodes are defined in a YAML file. This mode is ideal for testing environments,
small deployments, or scenarios where dynamic service discovery infrastructure
is unavailable.
+
+The service monitors the configuration file for changes and automatically
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s # Timeout for metadata fetch
(default: 5s)
+
+# Retry settings
+--node-discovery-file-retry-interval=20s # Retry interval for failed nodes
(default: 20s)
+```
+
+### YAML Configuration Format
+
+```yaml
+nodes:
+ - name: liaison-0
+ grpc_address: 192.168.1.10:18912
+ - name: data-hot-0
+ grpc_address: 192.168.1.20:17912
+ tls_enabled: true
+ ca_cert_path: /etc/banyandb/certs/ca.crt
+ - name: data-cold-0
+ grpc_address: 192.168.1.30:17912
+```
+
+**Configuration Fields:**
+
+- **name** (required): Identifier for the node
+- **grpc_address** (required): gRPC endpoint in `host:port` format
+- **tls_enabled** (optional): Enable TLS for gRPC connection (default: false)
+- **ca_cert_path** (optional): Path to CA certificate file (required when TLS
is enabled)
+
+### Configuration Examples
+
+**Basic Configuration:**
+
+```shell
+banyand data \
+ --node-discovery-mode=file \
+ --node-discovery-file-path=/etc/banyandb/nodes.yaml
+```
+
+**With Custom Retry Interval:**
+
+```shell
+banyand liaison \
+ --node-discovery-mode=file \
+ --node-discovery-file-path=/etc/banyandb/nodes.yaml \
+ --node-discovery-file-retry-interval=30s
+```
+
+### Node Lifecycle
+
+#### Initial/Interval Load
+
+When the service starts:
+1. Reads the YAML configuration file
+2. Validates required fields (`grpc_address`)
+3. Attempts gRPC connection to each node
+4. Successfully connected nodes → added to cache
+5. Failed nodes → wait for the next interval
Review Comment:
The documentation states "Failed nodes → wait for the next interval" but
this is misleading. The implementation doesn't maintain a separate retry queue
for failed nodes. Instead, on each periodic fetch interval, the entire file is
reloaded and all nodes (including previously failed ones) are attempted again.
The term "retry queue" in line 374 is also inaccurate based on the
implementation.
##########
banyand/metadata/discovery/file/file_test.go:
##########
@@ -0,0 +1,461 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+const (
+ testGRPCTimeout = 2 * time.Second
+ testFetchInterval = 200 * time.Millisecond
+)
+
+func TestNewService(t *testing.T) {
+ t.Run("valid config", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+ grpc_address: 127.0.0.1:17912
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ })
+ require.NoError(t, err)
+ require.NotNil(t, svc)
+ require.NoError(t, svc.Close())
+ })
+
+ t.Run("empty file path", func(t *testing.T) {
+ _, err := NewService(Config{FilePath: ""})
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "file path cannot be empty")
+ })
+
+ t.Run("non-existent file", func(t *testing.T) {
+ _, err := NewService(Config{
+ FilePath: "/not/exist",
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ })
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "failed to access file path")
+ })
+}
+
+func TestStartWithInvalidConfig(t *testing.T) {
+ ctx := context.Background()
+
+ t.Run("invalid yaml", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+ grpc_address: [invalid
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ })
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "failed to parse YAML")
+ })
+
+ t.Run("missing address", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ })
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "missing required field:
grpc_address")
+ })
+
+ t.Run("tls enabled without ca cert", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+ grpc_address: 127.0.0.1:17912
+ tls_enabled: true
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ })
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "missing ca_cert_path")
+ })
+}
+
+func TestStartAndCacheNodes(t *testing.T) {
+ listener, grpcServer, nodeServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+
+ nodeName := "test-node"
+ serverAddr := listener.Addr().String()
+ nodeServer.setNode(newTestNode(nodeName, serverAddr))
+
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: %s
+ grpc_address: %s
+`, nodeName, serverAddr))
+ defer os.Remove(configFile)
+
+ svc, err := NewService(Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ })
+ require.NoError(t, err)
+ defer svc.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ require.NoError(t, svc.Start(ctx))
+
+ nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+ require.NoError(t, listErr)
+ require.Len(t, nodes, 1)
+ assert.Equal(t, nodeName, nodes[0].GetMetadata().GetName())
+ assert.Equal(t, serverAddr, nodes[0].GetGrpcAddress())
+
+ nodeFromCache, getErr := svc.GetNode(ctx, serverAddr)
Review Comment:
The test passes "addr" (an address like "127.0.0.1:12345") to GetNode but
based on the interface contract established by the etcd and DNS
implementations, GetNode should accept a node name, not an address. This test
is passing because the file implementation incorrectly uses address as the
cache key, but this is actually testing incorrect behavior. The test should be
updated to call GetNode with the node name once the implementation is fixed to
properly look up by name.
```suggestion
nodeFromCache, getErr := svc.GetNode(ctx, nodeName)
```
##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata
management.
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "gopkg.in/yaml.v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+ nodeCache map[string]*databasev1.Node
+ closer *run.Closer
+ log *logger.Logger
+ metrics *metrics
+ handlers map[string]schema.EventHandler
+ filePath string
+ grpcTimeout time.Duration
+ fetchInterval time.Duration
+ cacheMutex sync.RWMutex
+ handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+ FilePath string
+ GRPCTimeout time.Duration
+ FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+ Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+ Name string `yaml:"name"`
+ Address string `yaml:"grpc_address"`
+ CACertPath string `yaml:"ca_cert_path"`
+ TLSEnabled bool `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+ if cfg.FilePath == "" {
+ return nil, errors.New("file path cannot be empty")
+ }
+
+ // validate file exists and is readable
+ if _, err := os.Stat(cfg.FilePath); err != nil {
+ return nil, fmt.Errorf("failed to access file path %s: %w",
cfg.FilePath, err)
+ }
+
+ svc := &Service{
+ filePath: cfg.FilePath,
+ nodeCache: make(map[string]*databasev1.Node),
+ handlers: make(map[string]schema.EventHandler),
+ closer: run.NewCloser(1),
+ log: logger.GetLogger("metadata-discovery-file"),
+ grpcTimeout: cfg.GRPCTimeout,
+ fetchInterval: cfg.FetchInterval,
+ }
+
+ return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+ s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based
node discovery service")
+
+ // initial load
+ if err := s.loadAndParseFile(ctx); err != nil {
+ return fmt.Errorf("failed to load initial configuration: %w",
err)
+ }
+
+ // start periodic fetch loop
+ go s.periodicFetch(ctx)
+
+ return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+ startTime := time.Now()
+ var parseErr error
+ defer func() {
+ if s.metrics != nil {
+ duration := time.Since(startTime)
+ s.metrics.fileLoadCount.Inc(1)
+ s.metrics.fileLoadDuration.Observe(duration.Seconds())
+ if parseErr != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+ }
+ }()
+
+ data, err := os.ReadFile(s.filePath)
+ if err != nil {
+ parseErr = fmt.Errorf("failed to read file: %w", err)
+ return parseErr
+ }
+
+ var cfg NodeFileConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+ return parseErr
+ }
+
+ // validate required fields
+ for idx, node := range cfg.Nodes {
+ if node.Address == "" {
+ parseErr = fmt.Errorf("node %s at index %d is missing
required field: grpc_address", node.Name, idx)
+ return parseErr
+ }
+ if node.TLSEnabled && node.CACertPath == "" {
+ parseErr = fmt.Errorf("node %s at index %d has TLS
enabled but missing ca_cert_path", node.Name, idx)
+ return parseErr
+ }
+ }
+
+ // update cache
+ s.updateNodeCache(ctx, cfg.Nodes)
+
+ s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully
loaded configuration file")
+ return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig
NodeConfig) (*databasev1.Node, error) {
+ ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+ defer cancel()
+
+ // prepare TLS options
+ dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled,
false, nodeConfig.CACertPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load TLS config for node %s:
%w", nodeConfig.Name, err)
+ }
+
+ // connect to node
+ // nolint:contextcheck
+ conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout,
dialOpts...)
+ if connErr != nil {
+ return nil, fmt.Errorf("failed to connect to %s: %w",
nodeConfig.Address, connErr)
+ }
+ defer conn.Close()
+
+ // query metadata of the node
+ client := databasev1.NewNodeQueryServiceClient(conn)
+ resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
+ if callErr != nil {
+ return nil, fmt.Errorf("failed to get current node from %s:
%w", nodeConfig.Address, callErr)
+ }
+
+ return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+ for _, n := range newNodes {
+ s.cacheMutex.RLock()
+ _, exists := s.nodeCache[n.Address]
+ s.cacheMutex.RUnlock()
+
+ if !exists {
+ // fetch node metadata from gRPC
+ node, fetchErr := s.fetchNodeMetadata(ctx, n)
+ if fetchErr != nil {
+ s.log.Warn().
+ Err(fetchErr).
+ Str("node", n.Name).
+ Str("address", n.Address).
+ Msg("Failed to fetch node metadata,
will skip")
+ continue
+ }
+
+ s.cacheMutex.Lock()
+ if _, alreadyAdded := s.nodeCache[n.Address];
!alreadyAdded {
+ s.nodeCache[n.Address] = node
+
+ // notify handlers after releasing lock
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name:
node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ s.log.Debug().
+ Str("address", n.Address).
+ Str("name",
node.GetMetadata().GetName()).
+ Msg("New node discovered and added to
cache")
+ }
+ s.cacheMutex.Unlock()
+ }
+ }
+
+ // collect nodes to delete first
+ allAddr := make(map[string]bool)
+ for _, n := range newNodes {
+ allAddr[n.Address] = true
+ }
+ s.cacheMutex.Lock()
+ nodesToDelete := make(map[string]*databasev1.Node)
+ for addr, node := range s.nodeCache {
+ if !allAddr[addr] {
+ nodesToDelete[addr] = node
+ }
+ }
+
+ // delete from cache while still holding lock
+ for addr, node := range nodesToDelete {
+ delete(s.nodeCache, addr)
+ s.log.Debug().
+ Str("address", addr).
+ Str("name", node.GetMetadata().GetName()).
+ Msg("Node removed from cache (no longer in file)")
+ }
+ cacheSize := len(s.nodeCache)
+ s.cacheMutex.Unlock()
+
+ // Notify handlers after releasing lock
+ for _, node := range nodesToDelete {
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, false)
+ }
+ // update metrics
+ if s.metrics != nil {
+ s.metrics.totalNodesCount.Set(float64(cacheSize))
+ }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool)
{
+ s.handlersMutex.RLock()
+ defer s.handlersMutex.RUnlock()
+
+ for _, handler := range s.handlers {
+ if isAddOrUpdate {
+ handler.OnAddOrUpdate(metadata)
+ } else {
+ handler.OnDelete(metadata)
+ }
+ }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+ s.handlersMutex.Lock()
+ defer s.handlersMutex.Unlock()
+
+ s.handlers[name] = handler
+ s.log.Debug().Str("handler", name).Msg("Registered file node discovery
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+ s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+ s.closer.Done()
+ s.closer.CloseThenWait()
+
+ return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role)
([]*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ var result []*databasev1.Node
+ for _, node := range s.nodeCache {
+ // file mode doesn't support role filtering as roles are not
stored in config
+ // return all nodes when ROLE_UNSPECIFIED, otherwise return
empty list
+ if role == databasev1.Role_ROLE_UNSPECIFIED {
+ result = append(result, node)
+ }
+ }
+
+ return result, nil
+}
+
+// GetNode gets a specific node from cache.
+func (s *Service) GetNode(_ context.Context, nodeName string)
(*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ if node, exists := s.nodeCache[nodeName]; exists {
+ return node, nil
+ }
+
+ return nil, fmt.Errorf("node %s not found", nodeName)
+}
+
+// RegisterNode is not supported in file discovery mode.
+func (s *Service) RegisterNode(_ context.Context, _ *databasev1.Node, _ bool)
error {
+ return errors.New("manual node registration not supported in file
discovery mode")
+}
+
+// UpdateNode is not supported in file discovery mode.
+func (s *Service) UpdateNode(_ context.Context, _ *databasev1.Node) error {
+ return errors.New("manual node update not supported in file discovery
mode")
+}
+
+func (s *Service) periodicFetch(ctx context.Context) {
+ fetchTicker := time.NewTicker(s.fetchInterval)
+
+ for {
+ select {
+ case <-fetchTicker.C:
+ if err := s.loadAndParseFile(ctx); err != nil {
+ s.log.Warn().Err(err).Msg("Failed to reload
configuration file")
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
Review Comment:
The periodicFetch function doesn't check for s.closer.CloseNotify() like the
DNS discovery's discoveryLoop does (dns.go line 250). This means the periodic
ticker might not stop promptly when Close() is called. Consider adding a case
for s.closer.CloseNotify() in the select statement to ensure clean shutdown.
##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata
management.
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "gopkg.in/yaml.v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+ nodeCache map[string]*databasev1.Node
+ closer *run.Closer
+ log *logger.Logger
+ metrics *metrics
+ handlers map[string]schema.EventHandler
+ filePath string
+ grpcTimeout time.Duration
+ fetchInterval time.Duration
+ cacheMutex sync.RWMutex
+ handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+ FilePath string
+ GRPCTimeout time.Duration
+ FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+ Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+ Name string `yaml:"name"`
+ Address string `yaml:"grpc_address"`
+ CACertPath string `yaml:"ca_cert_path"`
+ TLSEnabled bool `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+ if cfg.FilePath == "" {
+ return nil, errors.New("file path cannot be empty")
+ }
+
+ // validate file exists and is readable
+ if _, err := os.Stat(cfg.FilePath); err != nil {
+ return nil, fmt.Errorf("failed to access file path %s: %w",
cfg.FilePath, err)
+ }
+
+ svc := &Service{
+ filePath: cfg.FilePath,
+ nodeCache: make(map[string]*databasev1.Node),
+ handlers: make(map[string]schema.EventHandler),
+ closer: run.NewCloser(1),
+ log: logger.GetLogger("metadata-discovery-file"),
+ grpcTimeout: cfg.GRPCTimeout,
+ fetchInterval: cfg.FetchInterval,
+ }
+
+ return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+ s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based
node discovery service")
+
+ // initial load
+ if err := s.loadAndParseFile(ctx); err != nil {
+ return fmt.Errorf("failed to load initial configuration: %w",
err)
+ }
+
+ // start periodic fetch loop
+ go s.periodicFetch(ctx)
+
+ return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+ startTime := time.Now()
+ var parseErr error
+ defer func() {
+ if s.metrics != nil {
+ duration := time.Since(startTime)
+ s.metrics.fileLoadCount.Inc(1)
+ s.metrics.fileLoadDuration.Observe(duration.Seconds())
+ if parseErr != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+ }
+ }()
+
+ data, err := os.ReadFile(s.filePath)
+ if err != nil {
+ parseErr = fmt.Errorf("failed to read file: %w", err)
+ return parseErr
+ }
+
+ var cfg NodeFileConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+ return parseErr
+ }
+
+ // validate required fields
+ for idx, node := range cfg.Nodes {
+ if node.Address == "" {
+ parseErr = fmt.Errorf("node %s at index %d is missing
required field: grpc_address", node.Name, idx)
+ return parseErr
+ }
+ if node.TLSEnabled && node.CACertPath == "" {
+ parseErr = fmt.Errorf("node %s at index %d has TLS
enabled but missing ca_cert_path", node.Name, idx)
+ return parseErr
+ }
+ }
+
+ // update cache
+ s.updateNodeCache(ctx, cfg.Nodes)
+
+ s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully
loaded configuration file")
+ return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig
NodeConfig) (*databasev1.Node, error) {
+ ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+ defer cancel()
+
+ // prepare TLS options
+ dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled,
false, nodeConfig.CACertPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load TLS config for node %s:
%w", nodeConfig.Name, err)
+ }
+
+ // connect to node
+ // nolint:contextcheck
+ conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout,
dialOpts...)
+ if connErr != nil {
+ return nil, fmt.Errorf("failed to connect to %s: %w",
nodeConfig.Address, connErr)
+ }
+ defer conn.Close()
+
+ // query metadata of the node
+ client := databasev1.NewNodeQueryServiceClient(conn)
+ resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
+ if callErr != nil {
+ return nil, fmt.Errorf("failed to get current node from %s:
%w", nodeConfig.Address, callErr)
+ }
+
+ return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+ for _, n := range newNodes {
+ s.cacheMutex.RLock()
+ _, exists := s.nodeCache[n.Address]
+ s.cacheMutex.RUnlock()
+
+ if !exists {
+ // fetch node metadata from gRPC
+ node, fetchErr := s.fetchNodeMetadata(ctx, n)
+ if fetchErr != nil {
+ s.log.Warn().
+ Err(fetchErr).
+ Str("node", n.Name).
+ Str("address", n.Address).
+ Msg("Failed to fetch node metadata,
will skip")
+ continue
+ }
+
+ s.cacheMutex.Lock()
+ if _, alreadyAdded := s.nodeCache[n.Address];
!alreadyAdded {
+ s.nodeCache[n.Address] = node
+
+ // notify handlers after releasing lock
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name:
node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ s.log.Debug().
+ Str("address", n.Address).
+ Str("name",
node.GetMetadata().GetName()).
+ Msg("New node discovered and added to
cache")
+ }
+ s.cacheMutex.Unlock()
+ }
+ }
+
+ // collect nodes to delete first
+ allAddr := make(map[string]bool)
+ for _, n := range newNodes {
+ allAddr[n.Address] = true
+ }
+ s.cacheMutex.Lock()
+ nodesToDelete := make(map[string]*databasev1.Node)
+ for addr, node := range s.nodeCache {
+ if !allAddr[addr] {
+ nodesToDelete[addr] = node
+ }
+ }
+
+ // delete from cache while still holding lock
+ for addr, node := range nodesToDelete {
+ delete(s.nodeCache, addr)
+ s.log.Debug().
+ Str("address", addr).
+ Str("name", node.GetMetadata().GetName()).
+ Msg("Node removed from cache (no longer in file)")
+ }
+ cacheSize := len(s.nodeCache)
+ s.cacheMutex.Unlock()
+
+ // Notify handlers after releasing lock
+ for _, node := range nodesToDelete {
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, false)
+ }
+ // update metrics
+ if s.metrics != nil {
+ s.metrics.totalNodesCount.Set(float64(cacheSize))
+ }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool)
{
+ s.handlersMutex.RLock()
+ defer s.handlersMutex.RUnlock()
+
+ for _, handler := range s.handlers {
+ if isAddOrUpdate {
+ handler.OnAddOrUpdate(metadata)
+ } else {
+ handler.OnDelete(metadata)
+ }
+ }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+ s.handlersMutex.Lock()
+ defer s.handlersMutex.Unlock()
+
+ s.handlers[name] = handler
+ s.log.Debug().Str("handler", name).Msg("Registered file node discovery
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+ s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+ s.closer.Done()
+ s.closer.CloseThenWait()
+
+ return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role)
([]*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ var result []*databasev1.Node
+ for _, node := range s.nodeCache {
+ // file mode doesn't support role filtering as roles are not
stored in config
+ // return all nodes when ROLE_UNSPECIFIED, otherwise return
empty list
+ if role == databasev1.Role_ROLE_UNSPECIFIED {
+ result = append(result, node)
+ }
+ }
+
+ return result, nil
+}
+
+// GetNode gets a specific node from cache.
+func (s *Service) GetNode(_ context.Context, nodeName string)
(*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ if node, exists := s.nodeCache[nodeName]; exists {
+ return node, nil
+ }
+
+ return nil, fmt.Errorf("node %s not found", nodeName)
+}
+
+// RegisterNode is not supported in file discovery mode.
+func (s *Service) RegisterNode(_ context.Context, _ *databasev1.Node, _ bool)
error {
+ return errors.New("manual node registration not supported in file
discovery mode")
+}
+
+// UpdateNode is not supported in file discovery mode.
+func (s *Service) UpdateNode(_ context.Context, _ *databasev1.Node) error {
+ return errors.New("manual node update not supported in file discovery
mode")
+}
+
+func (s *Service) periodicFetch(ctx context.Context) {
+ fetchTicker := time.NewTicker(s.fetchInterval)
Review Comment:
The ticker created in periodicFetch is never explicitly stopped with
fetchTicker.Stop(). While the goroutine will exit when the context is done, the
ticker should be stopped to prevent resource leaks. Consider adding defer
fetchTicker.Stop() after creating the ticker.
```suggestion
fetchTicker := time.NewTicker(s.fetchInterval)
defer fetchTicker.Stop()
```
##########
docs/operation/node-discovery.md:
##########
@@ -278,6 +279,102 @@ spec:
This creates DNS SRV record:
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where
nodes are defined in a YAML file. This mode is ideal for testing environments,
small deployments, or scenarios where dynamic service discovery infrastructure
is unavailable.
+
+The service monitors the configuration file for changes and automatically
updates the node registry when the file is modified.
+
+### How it Works
+
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Failed nodes are tracked separately and retried periodically
+5. Watch the file for changes and reload automatically
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s # Timeout for metadata fetch
(default: 5s)
+
+# Retry settings
+--node-discovery-file-retry-interval=20s # Retry interval for failed nodes
(default: 20s)
+```
+
+### YAML Configuration Format
+
+```yaml
+nodes:
+ - name: liaison-0
+ grpc_address: 192.168.1.10:18912
+ - name: data-hot-0
+ grpc_address: 192.168.1.20:17912
+ tls_enabled: true
+ ca_cert_path: /etc/banyandb/certs/ca.crt
+ - name: data-cold-0
+ grpc_address: 192.168.1.30:17912
+```
+
+**Configuration Fields:**
+
+- **name** (required): Identifier for the node
+- **grpc_address** (required): gRPC endpoint in `host:port` format
+- **tls_enabled** (optional): Enable TLS for gRPC connection (default: false)
+- **ca_cert_path** (optional): Path to CA certificate file (required when TLS
is enabled)
+
+### Configuration Examples
+
+**Basic Configuration:**
+
+```shell
+banyand data \
+ --node-discovery-mode=file \
+ --node-discovery-file-path=/etc/banyandb/nodes.yaml
+```
+
+**With Custom Retry Interval:**
+
+```shell
+banyand liaison \
+ --node-discovery-mode=file \
+ --node-discovery-file-path=/etc/banyandb/nodes.yaml \
+ --node-discovery-file-retry-interval=30s
+```
+
+### Node Lifecycle
+
+#### Initial/Interval Load
+
+When the service starts:
+1. Reads the YAML configuration file
+2. Validates required fields (`grpc_address`)
+3. Attempts gRPC connection to each node
+4. Successfully connected nodes → added to cache
+5. Failed nodes → wait for the next interval
+
+### Error Handling
+
+**Startup Errors:**
+- Missing or invalid file path → service fails to start
+- Invalid YAML format → service fails to start
+- Missing required fields → service fails to start
+
+**Runtime Errors:**
+- gRPC connection failure → node added to retry queue
Review Comment:
The documentation mentions "node added to retry queue" but the actual
implementation in file.go does not use a retry queue. Failed nodes are simply
skipped during updateNodeCache and will be retried when the file is reloaded on
the next fetch interval. This terminology should be corrected to match the
actual implementation behavior.
```suggestion
- gRPC connection failure → node skipped; retried on next file reload
```
##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata
management.
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "gopkg.in/yaml.v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+ nodeCache map[string]*databasev1.Node
+ closer *run.Closer
+ log *logger.Logger
+ metrics *metrics
+ handlers map[string]schema.EventHandler
+ filePath string
+ grpcTimeout time.Duration
+ fetchInterval time.Duration
+ cacheMutex sync.RWMutex
+ handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+ FilePath string
+ GRPCTimeout time.Duration
+ FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+ Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+ Name string `yaml:"name"`
+ Address string `yaml:"grpc_address"`
+ CACertPath string `yaml:"ca_cert_path"`
+ TLSEnabled bool `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+ if cfg.FilePath == "" {
+ return nil, errors.New("file path cannot be empty")
+ }
+
+ // validate file exists and is readable
+ if _, err := os.Stat(cfg.FilePath); err != nil {
+ return nil, fmt.Errorf("failed to access file path %s: %w",
cfg.FilePath, err)
+ }
+
+ svc := &Service{
+ filePath: cfg.FilePath,
+ nodeCache: make(map[string]*databasev1.Node),
+ handlers: make(map[string]schema.EventHandler),
+ closer: run.NewCloser(1),
+ log: logger.GetLogger("metadata-discovery-file"),
+ grpcTimeout: cfg.GRPCTimeout,
+ fetchInterval: cfg.FetchInterval,
+ }
+
+ return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+ s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based
node discovery service")
+
+ // initial load
+ if err := s.loadAndParseFile(ctx); err != nil {
+ return fmt.Errorf("failed to load initial configuration: %w",
err)
+ }
+
+ // start periodic fetch loop
+ go s.periodicFetch(ctx)
+
+ return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+ startTime := time.Now()
+ var parseErr error
+ defer func() {
+ if s.metrics != nil {
+ duration := time.Since(startTime)
+ s.metrics.fileLoadCount.Inc(1)
+ s.metrics.fileLoadDuration.Observe(duration.Seconds())
+ if parseErr != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+ }
+ }()
+
+ data, err := os.ReadFile(s.filePath)
+ if err != nil {
+ parseErr = fmt.Errorf("failed to read file: %w", err)
+ return parseErr
+ }
+
+ var cfg NodeFileConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+ return parseErr
+ }
+
+ // validate required fields
+ for idx, node := range cfg.Nodes {
+ if node.Address == "" {
+ parseErr = fmt.Errorf("node %s at index %d is missing
required field: grpc_address", node.Name, idx)
+ return parseErr
+ }
+ if node.TLSEnabled && node.CACertPath == "" {
+ parseErr = fmt.Errorf("node %s at index %d has TLS
enabled but missing ca_cert_path", node.Name, idx)
+ return parseErr
+ }
+ }
+
+ // update cache
+ s.updateNodeCache(ctx, cfg.Nodes)
+
+ s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully
loaded configuration file")
+ return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig
NodeConfig) (*databasev1.Node, error) {
+ ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+ defer cancel()
+
+ // prepare TLS options
+ dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled,
false, nodeConfig.CACertPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load TLS config for node %s:
%w", nodeConfig.Name, err)
+ }
+
+ // connect to node
+ // nolint:contextcheck
+ conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout,
dialOpts...)
+ if connErr != nil {
+ return nil, fmt.Errorf("failed to connect to %s: %w",
nodeConfig.Address, connErr)
+ }
+ defer conn.Close()
+
+ // query metadata of the node
+ client := databasev1.NewNodeQueryServiceClient(conn)
+ resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
+ if callErr != nil {
+ return nil, fmt.Errorf("failed to get current node from %s:
%w", nodeConfig.Address, callErr)
+ }
+
+ return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+ for _, n := range newNodes {
+ s.cacheMutex.RLock()
+ _, exists := s.nodeCache[n.Address]
+ s.cacheMutex.RUnlock()
+
+ if !exists {
+ // fetch node metadata from gRPC
+ node, fetchErr := s.fetchNodeMetadata(ctx, n)
+ if fetchErr != nil {
+ s.log.Warn().
+ Err(fetchErr).
+ Str("node", n.Name).
+ Str("address", n.Address).
+ Msg("Failed to fetch node metadata,
will skip")
+ continue
+ }
+
+ s.cacheMutex.Lock()
+ if _, alreadyAdded := s.nodeCache[n.Address];
!alreadyAdded {
+ s.nodeCache[n.Address] = node
+
+ // notify handlers after releasing lock
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name:
node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ s.log.Debug().
+ Str("address", n.Address).
+ Str("name",
node.GetMetadata().GetName()).
+ Msg("New node discovered and added to
cache")
+ }
+ s.cacheMutex.Unlock()
+ }
+ }
+
+ // collect nodes to delete first
+ allAddr := make(map[string]bool)
+ for _, n := range newNodes {
+ allAddr[n.Address] = true
+ }
+ s.cacheMutex.Lock()
+ nodesToDelete := make(map[string]*databasev1.Node)
+ for addr, node := range s.nodeCache {
+ if !allAddr[addr] {
+ nodesToDelete[addr] = node
+ }
+ }
+
+ // delete from cache while still holding lock
+ for addr, node := range nodesToDelete {
+ delete(s.nodeCache, addr)
+ s.log.Debug().
+ Str("address", addr).
+ Str("name", node.GetMetadata().GetName()).
+ Msg("Node removed from cache (no longer in file)")
+ }
+ cacheSize := len(s.nodeCache)
+ s.cacheMutex.Unlock()
+
+ // Notify handlers after releasing lock
+ for _, node := range nodesToDelete {
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, false)
+ }
+ // update metrics
+ if s.metrics != nil {
+ s.metrics.totalNodesCount.Set(float64(cacheSize))
+ }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool)
{
+ s.handlersMutex.RLock()
+ defer s.handlersMutex.RUnlock()
+
+ for _, handler := range s.handlers {
+ if isAddOrUpdate {
+ handler.OnAddOrUpdate(metadata)
+ } else {
+ handler.OnDelete(metadata)
+ }
+ }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+ s.handlersMutex.Lock()
+ defer s.handlersMutex.Unlock()
+
+ s.handlers[name] = handler
+ s.log.Debug().Str("handler", name).Msg("Registered file node discovery
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+ s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+ s.closer.Done()
+ s.closer.CloseThenWait()
+
+ return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role)
([]*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ var result []*databasev1.Node
+ for _, node := range s.nodeCache {
+ // file mode doesn't support role filtering as roles are not
stored in config
+ // return all nodes when ROLE_UNSPECIFIED, otherwise return
empty list
+ if role == databasev1.Role_ROLE_UNSPECIFIED {
+ result = append(result, node)
+ }
+ }
+
+ return result, nil
+}
+
+// GetNode gets a specific node from cache.
+func (s *Service) GetNode(_ context.Context, nodeName string)
(*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ if node, exists := s.nodeCache[nodeName]; exists {
+ return node, nil
+ }
+
+ return nil, fmt.Errorf("node %s not found", nodeName)
Review Comment:
Inconsistent error message format. The comment says "GetNode gets a specific
node from cache" but the error message uses "node %s not found". For
consistency with GetNode in dns.go (line 604) which has the same error message,
consider whether the parameter name should match the error message. In file.go,
the parameter is named "nodeName" but the cache key is actually the node's
address (grpc_address), not the name. This is a semantic mismatch - the
function appears to look up by address, not by name.
```suggestion
func (s *Service) GetNode(_ context.Context, nodeAddress string)
(*databasev1.Node, error) {
s.cacheMutex.RLock()
defer s.cacheMutex.RUnlock()
if node, exists := s.nodeCache[nodeAddress]; exists {
return node, nil
}
return nil, fmt.Errorf("node %s not found", nodeAddress)
```
##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata
management.
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "gopkg.in/yaml.v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+ nodeCache map[string]*databasev1.Node
+ closer *run.Closer
+ log *logger.Logger
+ metrics *metrics
+ handlers map[string]schema.EventHandler
+ filePath string
+ grpcTimeout time.Duration
+ fetchInterval time.Duration
+ cacheMutex sync.RWMutex
+ handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+ FilePath string
+ GRPCTimeout time.Duration
+ FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+ Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+ Name string `yaml:"name"`
+ Address string `yaml:"grpc_address"`
+ CACertPath string `yaml:"ca_cert_path"`
+ TLSEnabled bool `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+ if cfg.FilePath == "" {
+ return nil, errors.New("file path cannot be empty")
+ }
+
+ // validate file exists and is readable
+ if _, err := os.Stat(cfg.FilePath); err != nil {
+ return nil, fmt.Errorf("failed to access file path %s: %w",
cfg.FilePath, err)
+ }
+
+ svc := &Service{
+ filePath: cfg.FilePath,
+ nodeCache: make(map[string]*databasev1.Node),
+ handlers: make(map[string]schema.EventHandler),
+ closer: run.NewCloser(1),
+ log: logger.GetLogger("metadata-discovery-file"),
+ grpcTimeout: cfg.GRPCTimeout,
+ fetchInterval: cfg.FetchInterval,
+ }
+
+ return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+ s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based
node discovery service")
+
+ // initial load
+ if err := s.loadAndParseFile(ctx); err != nil {
+ return fmt.Errorf("failed to load initial configuration: %w",
err)
+ }
+
+ // start periodic fetch loop
+ go s.periodicFetch(ctx)
+
+ return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+ startTime := time.Now()
+ var parseErr error
+ defer func() {
+ if s.metrics != nil {
+ duration := time.Since(startTime)
+ s.metrics.fileLoadCount.Inc(1)
+ s.metrics.fileLoadDuration.Observe(duration.Seconds())
+ if parseErr != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+ }
+ }()
+
+ data, err := os.ReadFile(s.filePath)
+ if err != nil {
+ parseErr = fmt.Errorf("failed to read file: %w", err)
+ return parseErr
+ }
+
+ var cfg NodeFileConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+ return parseErr
+ }
+
+ // validate required fields
+ for idx, node := range cfg.Nodes {
+ if node.Address == "" {
+ parseErr = fmt.Errorf("node %s at index %d is missing
required field: grpc_address", node.Name, idx)
+ return parseErr
+ }
+ if node.TLSEnabled && node.CACertPath == "" {
+ parseErr = fmt.Errorf("node %s at index %d has TLS
enabled but missing ca_cert_path", node.Name, idx)
+ return parseErr
+ }
+ }
+
+ // update cache
+ s.updateNodeCache(ctx, cfg.Nodes)
+
+ s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully
loaded configuration file")
+ return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig
NodeConfig) (*databasev1.Node, error) {
+ ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+ defer cancel()
+
+ // prepare TLS options
+ dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled,
false, nodeConfig.CACertPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load TLS config for node %s:
%w", nodeConfig.Name, err)
+ }
+
+ // connect to node
+ // nolint:contextcheck
+ conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout,
dialOpts...)
+ if connErr != nil {
+ return nil, fmt.Errorf("failed to connect to %s: %w",
nodeConfig.Address, connErr)
+ }
+ defer conn.Close()
+
+ // query metadata of the node
+ client := databasev1.NewNodeQueryServiceClient(conn)
+ resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
+ if callErr != nil {
+ return nil, fmt.Errorf("failed to get current node from %s:
%w", nodeConfig.Address, callErr)
+ }
+
+ return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+ for _, n := range newNodes {
+ s.cacheMutex.RLock()
+ _, exists := s.nodeCache[n.Address]
+ s.cacheMutex.RUnlock()
+
+ if !exists {
+ // fetch node metadata from gRPC
+ node, fetchErr := s.fetchNodeMetadata(ctx, n)
+ if fetchErr != nil {
+ s.log.Warn().
+ Err(fetchErr).
+ Str("node", n.Name).
+ Str("address", n.Address).
+ Msg("Failed to fetch node metadata,
will skip")
+ continue
+ }
+
+ s.cacheMutex.Lock()
+ if _, alreadyAdded := s.nodeCache[n.Address];
!alreadyAdded {
+ s.nodeCache[n.Address] = node
+
+ // notify handlers after releasing lock
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name:
node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ s.log.Debug().
+ Str("address", n.Address).
+ Str("name",
node.GetMetadata().GetName()).
+ Msg("New node discovered and added to
cache")
+ }
+ s.cacheMutex.Unlock()
Review Comment:
The comment says "notify handlers after releasing lock" on line 207, but the
call to notifyHandlers is actually made while still holding the
cacheMutex.Lock() (acquired on line 203, released on line 221). This is a
concurrency issue - if a handler performs operations that acquire the same lock
or blocks, it could cause deadlocks or performance problems. The implementation
should follow the comment's intent and call notifyHandlers after releasing the
lock, similar to how it's done correctly for deletions on lines 249-258.
```suggestion
var (
added bool
metadata schema.Metadata
)
s.cacheMutex.Lock()
if _, alreadyAdded := s.nodeCache[n.Address];
!alreadyAdded {
s.nodeCache[n.Address] = node
added = true
metadata = schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindNode,
Name:
node.GetMetadata().GetName(),
},
Spec: node,
}
}
s.cacheMutex.Unlock()
if added {
// notify handlers after releasing lock
s.notifyHandlers(metadata, true)
s.log.Debug().
Str("address", n.Address).
Str("name",
node.GetMetadata().GetName()).
Msg("New node discovered and added to
cache")
}
```
##########
banyand/metadata/discovery/file/file.go:
##########
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata
management.
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "gopkg.in/yaml.v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+ nodeCache map[string]*databasev1.Node
+ closer *run.Closer
+ log *logger.Logger
+ metrics *metrics
+ handlers map[string]schema.EventHandler
+ filePath string
+ grpcTimeout time.Duration
+ fetchInterval time.Duration
+ cacheMutex sync.RWMutex
+ handlersMutex sync.RWMutex
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+ FilePath string
+ GRPCTimeout time.Duration
+ FetchInterval time.Duration
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+ Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+ Name string `yaml:"name"`
+ Address string `yaml:"grpc_address"`
+ CACertPath string `yaml:"ca_cert_path"`
+ TLSEnabled bool `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+ if cfg.FilePath == "" {
+ return nil, errors.New("file path cannot be empty")
+ }
+
+ // validate file exists and is readable
+ if _, err := os.Stat(cfg.FilePath); err != nil {
+ return nil, fmt.Errorf("failed to access file path %s: %w",
cfg.FilePath, err)
+ }
+
+ svc := &Service{
+ filePath: cfg.FilePath,
+ nodeCache: make(map[string]*databasev1.Node),
+ handlers: make(map[string]schema.EventHandler),
+ closer: run.NewCloser(1),
+ log: logger.GetLogger("metadata-discovery-file"),
+ grpcTimeout: cfg.GRPCTimeout,
+ fetchInterval: cfg.FetchInterval,
+ }
+
+ return svc, nil
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+ s.log.Debug().Str("file_path", s.filePath).Msg("Starting file-based
node discovery service")
+
+ // initial load
+ if err := s.loadAndParseFile(ctx); err != nil {
+ return fmt.Errorf("failed to load initial configuration: %w",
err)
+ }
+
+ // start periodic fetch loop
+ go s.periodicFetch(ctx)
+
+ return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+ startTime := time.Now()
+ var parseErr error
+ defer func() {
+ if s.metrics != nil {
+ duration := time.Since(startTime)
+ s.metrics.fileLoadCount.Inc(1)
+ s.metrics.fileLoadDuration.Observe(duration.Seconds())
+ if parseErr != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+ }
+ }()
+
+ data, err := os.ReadFile(s.filePath)
+ if err != nil {
+ parseErr = fmt.Errorf("failed to read file: %w", err)
+ return parseErr
+ }
+
+ var cfg NodeFileConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ parseErr = fmt.Errorf("failed to parse YAML: %w", err)
+ return parseErr
+ }
+
+ // validate required fields
+ for idx, node := range cfg.Nodes {
+ if node.Address == "" {
+ parseErr = fmt.Errorf("node %s at index %d is missing
required field: grpc_address", node.Name, idx)
+ return parseErr
+ }
+ if node.TLSEnabled && node.CACertPath == "" {
+ parseErr = fmt.Errorf("node %s at index %d has TLS
enabled but missing ca_cert_path", node.Name, idx)
+ return parseErr
+ }
+ }
+
+ // update cache
+ s.updateNodeCache(ctx, cfg.Nodes)
+
+ s.log.Debug().Int("node_count", len(cfg.Nodes)).Msg("Successfully
loaded configuration file")
+ return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig
NodeConfig) (*databasev1.Node, error) {
+ ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+ defer cancel()
+
+ // prepare TLS options
+ dialOpts, err := grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled,
false, nodeConfig.CACertPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load TLS config for node %s:
%w", nodeConfig.Name, err)
+ }
+
+ // connect to node
+ // nolint:contextcheck
+ conn, connErr := grpchelper.Conn(nodeConfig.Address, s.grpcTimeout,
dialOpts...)
+ if connErr != nil {
+ return nil, fmt.Errorf("failed to connect to %s: %w",
nodeConfig.Address, connErr)
+ }
+ defer conn.Close()
+
+ // query metadata of the node
+ client := databasev1.NewNodeQueryServiceClient(conn)
+ resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
+ if callErr != nil {
+ return nil, fmt.Errorf("failed to get current node from %s:
%w", nodeConfig.Address, callErr)
+ }
+
+ return resp.GetNode(), nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+ for _, n := range newNodes {
+ s.cacheMutex.RLock()
+ _, exists := s.nodeCache[n.Address]
+ s.cacheMutex.RUnlock()
+
+ if !exists {
+ // fetch node metadata from gRPC
+ node, fetchErr := s.fetchNodeMetadata(ctx, n)
+ if fetchErr != nil {
+ s.log.Warn().
+ Err(fetchErr).
+ Str("node", n.Name).
+ Str("address", n.Address).
+ Msg("Failed to fetch node metadata,
will skip")
+ continue
+ }
+
+ s.cacheMutex.Lock()
+ if _, alreadyAdded := s.nodeCache[n.Address];
!alreadyAdded {
+ s.nodeCache[n.Address] = node
+
+ // notify handlers after releasing lock
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name:
node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ s.log.Debug().
+ Str("address", n.Address).
+ Str("name",
node.GetMetadata().GetName()).
+ Msg("New node discovered and added to
cache")
+ }
+ s.cacheMutex.Unlock()
+ }
+ }
+
+ // collect nodes to delete first
+ allAddr := make(map[string]bool)
+ for _, n := range newNodes {
+ allAddr[n.Address] = true
+ }
+ s.cacheMutex.Lock()
+ nodesToDelete := make(map[string]*databasev1.Node)
+ for addr, node := range s.nodeCache {
+ if !allAddr[addr] {
+ nodesToDelete[addr] = node
+ }
+ }
+
+ // delete from cache while still holding lock
+ for addr, node := range nodesToDelete {
+ delete(s.nodeCache, addr)
+ s.log.Debug().
+ Str("address", addr).
+ Str("name", node.GetMetadata().GetName()).
+ Msg("Node removed from cache (no longer in file)")
+ }
+ cacheSize := len(s.nodeCache)
+ s.cacheMutex.Unlock()
+
+ // Notify handlers after releasing lock
+ for _, node := range nodesToDelete {
+ s.notifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, false)
+ }
+ // update metrics
+ if s.metrics != nil {
+ s.metrics.totalNodesCount.Set(float64(cacheSize))
+ }
+}
+
+func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool)
{
+ s.handlersMutex.RLock()
+ defer s.handlersMutex.RUnlock()
+
+ for _, handler := range s.handlers {
+ if isAddOrUpdate {
+ handler.OnAddOrUpdate(metadata)
+ } else {
+ handler.OnDelete(metadata)
+ }
+ }
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
+ s.handlersMutex.Lock()
+ defer s.handlersMutex.Unlock()
+
+ s.handlers[name] = handler
+ s.log.Debug().Str("handler", name).Msg("Registered file node discovery
handler")
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+ s.metrics = newMetrics(factory)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+ s.closer.Done()
+ s.closer.CloseThenWait()
+
+ return nil
+}
+
+// ListNode lists all existing nodes from cache.
+func (s *Service) ListNode(_ context.Context, role databasev1.Role)
([]*databasev1.Node, error) {
+ s.cacheMutex.RLock()
+ defer s.cacheMutex.RUnlock()
+
+ var result []*databasev1.Node
+ for _, node := range s.nodeCache {
+ // file mode doesn't support role filtering as roles are not
stored in config
+ // return all nodes when ROLE_UNSPECIFIED, otherwise return
empty list
+ if role == databasev1.Role_ROLE_UNSPECIFIED {
Review Comment:
The comment states that file mode doesn't support role filtering and returns
an empty list for non-UNSPECIFIED roles. However, nodes fetched via gRPC
(fetchNodeMetadata) include role information in their metadata. Consider
fetching and storing role information from the nodes, then implementing proper
role filtering like DNS discovery does, rather than silently returning empty
results which could be confusing for users expecting role-based filtering to
work.
```suggestion
// return all nodes when ROLE_UNSPECIFIED, otherwise filter by
node role
if role == databasev1.Role_ROLE_UNSPECIFIED || node.GetRole()
== role {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]